## Spark context

In [3]:
pip install pyspark

Defaulting to user installation because normal site-packages is not writeableNote: you may need to restart the kernel to use updated packages.

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
     ---------------------------------------- 0.0/317.3 MB ? eta -:--:--
     -------------------------------------- 0.0/317.3 MB 330.3 kB/s eta 0:16:01
     -------------------------------------- 0.0/317.3 MB 435.7 kB/s eta 0:12:09
     -------------------------------------- 0.0/317.3 MB 281.8 kB/s eta 0:18:46
     -------------------------------------- 0.1/317.3 MB 476.3 kB/s eta 0:11:07
     -------------------------------------- 0.1/317.3 MB 563.7 kB/s eta 0:09:23
     -------------------------------------- 0.2/317.3 MB 734.2 kB/s eta 0:07:12
     -------------------------------------- 0.3/317.3 MB 820.5 kB/s eta 0:06:27
     -------------------------------------- 0.3/317.3 MB 835.9 kB/s eta 0:06:20
     -------------------------------------- 0.3/317.3 MB 835.9 kB/s eta 0:06:2

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [1]:
from pyspark import SparkContext

ModuleNotFoundError: No module named 'pyspark'

In [9]:
sc = SparkContext(appName = "AppName")

In [10]:
type(sc)

pyspark.context.SparkContext

In [11]:
sc.stop()

In [12]:
from pyspark import SparkConf
sc = SparkContext(conf=SparkConf().setAppName("MyApp").set("spark.executor.memory", "2g"))

In [13]:
sc.stop()

## Spark session

In [14]:
from pyspark.sql import SparkSession

In [15]:
spark = SparkSession.builder.appName("MyApp").getOrCreate()

In [16]:
spark.stop()

## Spark RDD

In [17]:
with open("numbers.txt", "w") as file:
    for i in range(1, 11):
        file.write(str(i) + "\n")

sc = SparkContext(appName="RDDExample")
data = sc.textFile("numbers.txt")

In [19]:
data.collect()

['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']

In [25]:
numbers = data.map(lambda x: int(x))
filtered_numbers = numbers.filter(lambda x: x % 2 == 0)
sum_of_numbers = filtered_numbers.reduce(lambda x, y: x + y)

In [27]:
sum_of_numbers

30

In [28]:
sc.stop()

# Quickstart: DataFrame

This is a short introduction and quickstart for the PySpark DataFrame API. PySpark DataFrames are lazily evaluated. They are implemented on top of [RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview)s. When Spark [transforms](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) data, it does not immediately compute the transformation but plans how to compute later. When [actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions) such as `collect()` are explicitly called, the computation starts.
This notebook shows the basic usages of the DataFrame, geared mainly for new users. You can run the latest version of these examples by yourself in 'Live Notebook: DataFrame' at [the quickstart page](https://spark.apache.org/docs/latest/api/python/getting_started/index.html).

There is also other useful information in Apache Spark documentation site, see the latest version of [Spark SQL and DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html), [RDD Programming Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html), [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html), [Spark Streaming Programming Guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html) and [Machine Learning Library (MLlib) Guide](https://spark.apache.org/docs/latest/ml-guide.html).

PySpark applications start with initializing `SparkSession` which is the entry point of PySpark as below. In case of running it in PySpark shell via <code>pyspark</code> executable, the shell automatically creates the session in the variable <code>spark</code> for users.

In [29]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MyApp").getOrCreate()

## DataFrame Creation

A PySpark DataFrame can be created via `pyspark.sql.SparkSession.createDataFrame` typically by passing a list of lists, tuples, dictionaries and `pyspark.sql.Row`s, a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) and an RDD consisting of such a list.
`pyspark.sql.SparkSession.createDataFrame` takes the `schema` argument to specify the schema of the DataFrame. When it is omitted, PySpark infers the corresponding schema by taking a sample from the data.

Firstly, you can create a PySpark DataFrame from a list of rows

In [30]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [31]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



Create a PySpark DataFrame with an explicit schema.

In [3]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

Create a PySpark DataFrame from a pandas DataFrame

In [32]:
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

The DataFrames created above all have the same results and schema.

In [33]:
# All DataFrames above result same.
df.show()
df.printSchema()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



## Viewing Data

The top rows of a DataFrame can be displayed using `DataFrame.show()`.

In [7]:
df.show(1)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 1 row



Alternatively, you can enable `spark.sql.repl.eagerEval.enabled` configuration for the eager evaluation of PySpark DataFrame in notebooks such as Jupyter. The number of rows to show can be controlled via `spark.sql.repl.eagerEval.maxNumRows` configuration.

In [8]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


The rows can also be shown vertically. This is useful when rows are too long to show horizontally.

In [9]:
df.show(1, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
only showing top 1 row



You can see the DataFrame's schema and column names as follows:

In [10]:
df.columns

['a', 'b', 'c', 'd', 'e']

In [11]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



Show the summary of the DataFrame

In [12]:
df.select("a", "b", "c").describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



`DataFrame.collect()` collects the distributed data to the driver side as the local data in Python. Note that this can throw an out-of-memory error when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side.

In [13]:
df.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

In order to avoid throwing an out-of-memory exception, use `DataFrame.take()` or `DataFrame.tail()`.

In [14]:
df.take(1)

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]

PySpark DataFrame also provides the conversion back to a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) to leverage pandas API. Note that `toPandas` also collects all data into the driver side that can easily cause an out-of-memory-error when the data is too large to fit into the driver side.

In [15]:
df.toPandas()

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,3,4.0,string3,2000-03-01,2000-01-03 12:00:00


## Selecting and Accessing Data

PySpark DataFrame is lazily evaluated and simply selecting a column does not trigger the computation but it returns a `Column` instance.

In [38]:
df.select("a", "b", "c").collect()[1][2]

'string2'

In [16]:
df.a

Column<b'a'>

In fact, most of column-wise operations return `Column`s.

## Функции работы со Spark DataFrame

In [41]:
from pyspark.sql import functions as F

In [54]:
(
    df
    .select(
        F.abs(F.col("a")).alias("abs_a"),
        F.sqrt(F.col("b")).alias("sqrt_b"),
        F.substring(F.col("c"), 4, 2).alias("substr_c"),
        F.concat(F.col("c"), F.col("a").cast('string')).alias("c_a"),
        F.when(F.col("d") == date(2000,2,1), 1).otherwise(0).alias("is_febr"),
        F.when(F.col("d") == date(2000,2,1), 2)
         .when(F.col("d") == date(2000,3,1), 3)
         .otherwise(1).alias("month")
    )
).show()

+-----+------------------+--------+--------+-------+-----+
|abs_a|            sqrt_b|substr_c|     c_a|is_febr|month|
+-----+------------------+--------+--------+-------+-----+
|    1|1.4142135623730951|      in|string11|      0|    1|
|    2|1.7320508075688772|      in|string22|      1|    2|
|    3|               2.0|      in|string33|      0|    3|
+-----+------------------+--------+--------+-------+-----+



In [55]:
spark.stop()

## Working with SQL

DataFrame and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. For example, you can register the DataFrame as a table and run a SQL easily as below:

In [30]:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       8|
+--------+



In addition, UDFs can be registered and invoked in SQL out of the box:

In [31]:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(v1) FROM tableA").show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+



These SQL expressions can directly be mixed and used as PySpark columns.

In [32]:
from pyspark.sql.functions import expr

df.selectExpr('add_one(v1)').show()
df.select(expr('count(*)') > 0).show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+

+--------------+
|(count(1) > 0)|
+--------------+
|          true|
+--------------+



# California Housing Dataset

In [57]:
import pandas as pd
spark = SparkSession.builder.appName("MyApp").getOrCreate()
pandas_df = pd.read_csv(r"https://raw.githubusercontent.com/timothypesi/Data-Sets-For-Machine-Learning-/refs/heads/main/california_housing_train.csv")
data = spark.createDataFrame(pandas_df)
data.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

### Применение SQL запросов в Spark

In [58]:
# Создаем View
data.createOrReplaceTempView("housing")

In [65]:
(
    spark
    .sql("""SELECT households, CASE WHEN households < 500 THEN "small" ELSE "large" END FROM housing""")
).show()

+----------+------------------------------------------------------+
|households|CASE WHEN (households < 500) THEN small ELSE large END|
+----------+------------------------------------------------------+
|     472.0|                                                 small|
|     463.0|                                                 small|
|     117.0|                                                 small|
|     226.0|                                                 small|
|     262.0|                                                 small|
|     239.0|                                                 small|
|     633.0|                                                 large|
|     158.0|                                                 small|
|    1056.0|                                                 large|
|     271.0|                                                 small|
|     824.0|                                                 large|
|     437.0|                                    

In [67]:
(
    data
    .select(
        F.col('households'),
        F.when(F.col('households') < 500, 'small').otherwise('large')
    )
).show()

+----------+------------------------------------------------------+
|households|CASE WHEN (households < 500) THEN small ELSE large END|
+----------+------------------------------------------------------+
|     472.0|                                                 small|
|     463.0|                                                 small|
|     117.0|                                                 small|
|     226.0|                                                 small|
|     262.0|                                                 small|
|     239.0|                                                 small|
|     633.0|                                                 large|
|     158.0|                                                 small|
|    1056.0|                                                 large|
|     271.0|                                                 small|
|     824.0|                                                 large|
|     437.0|                                    

In [68]:
(
    data
    .withColumn("hh_size", F.when(F.col('households') < 500, 'small').otherwise('large'))
    .withColumn("age", F.when(F.col('housing_median_age') <= 20, 'young').otherwise('old'))
).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------+-----+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|hh_size|  age|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------+-----+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|  small|young|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|  small|young|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|  small|young|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|  small|young|
|  -114.57|   33.57|              20.0|     1454

In [69]:
(
    data
    .withColumn("hh_size", F.when(F.col('households') < 500, 'small').otherwise('large'))
    .withColumn("age", F.when(F.col('housing_median_age') <= 20, 'young').otherwise('old'))
    .groupBy(F.col("hh_size"))
    .agg(
        F.count("households").alias("observations"),
        F.median("median_house_value").alias("median_house_value"),
        F.max("housing_median_age").alias("max_age"),
    )
).show()

+-------+------------+------------------+-------+
|hh_size|observations|median_house_value|max_age|
+-------+------------+------------------+-------+
|  small|       10854|          171600.0|   52.0|
|  large|        6146|          191900.0|   52.0|
+-------+------------+------------------+-------+



In [70]:
(
    data
    .withColumn("hh_size", F.when(F.col('households') < 500, 'small').otherwise('large'))
    .withColumn("age", F.when(F.col('housing_median_age') <= 20, 'young').otherwise('old'))
    .groupBy(F.col("hh_size"), F.col("age"))
    .agg(
        F.count("households").alias("observations"),
        F.median("median_house_value").alias("median_house_value"),
        F.sum("population").alias("population"),
    )
).show()

+-------+-----+------------+------------------+----------+
|hh_size|  age|observations|median_house_value|population|
+-------+-----+------------+------------------+----------+
|  small|  old|        8279|          175900.0| 7596300.0|
|  small|young|        2575|          162300.0| 2324571.0|
|  large|  old|        3512|          211100.0| 7263379.0|
|  large|young|        2634|          174400.0| 7118507.0|
+-------+-----+------------+------------------+----------+



In [71]:
agg_df1 = (
    data
    .withColumn("hh_size", F.when(F.col('households') < 500, 'small').otherwise('large'))
    .withColumn("age", F.when(F.col('housing_median_age') <= 20, 'young').otherwise('old'))
    .groupBy(F.col("hh_size"), F.col("age"))
    .agg(
        F.sum("population").alias("population"),
    )
)

agg_df2 = (
    data
    .withColumn("hh_size", F.when(F.col('households') < 500, 'small').otherwise('large'))
    .withColumn("age", F.when(F.col('housing_median_age') <= 20, 'young').otherwise('old'))
    .groupBy(F.col("hh_size"), F.col("age"))
    .agg(
        F.sum("households").alias("households"),
    )
)

In [72]:
agg_df1.show()

+-------+-----+----------+
|hh_size|  age|population|
+-------+-----+----------+
|  small|  old| 7596300.0|
|  small|young| 2324571.0|
|  large|  old| 7263379.0|
|  large|young| 7118507.0|
+-------+-----+----------+



In [73]:
agg_df2.show()

+-------+-----+----------+
|hh_size|  age|households|
+-------+-----+----------+
|  small|  old| 2540135.0|
|  small|young|  785900.0|
|  large|  old| 2635590.0|
|  large|young| 2559148.0|
+-------+-----+----------+



In [75]:
(
    agg_df1
    .join(agg_df2, how = "left", on = ["hh_size", "age"])
    .withColumn("pop_per_hh", F.col("population") / F.col("households"))
).show()

+-------+-----+----------+----------+------------------+
|hh_size|  age|population|households|        pop_per_hh|
+-------+-----+----------+----------+------------------+
|  small|  old| 7596300.0| 2540135.0| 2.990510346891012|
|  small|young| 2324571.0|  785900.0| 2.957845781906095|
|  large|  old| 7263379.0| 2635590.0|2.7558835023656942|
|  large|young| 7118507.0| 2559148.0|2.7815925456440973|
+-------+-----+----------+----------+------------------+

