# PySpark - intro
Andrzej Kocielski, 16-02-2022
___

## Install PySpark

In [1]:
# !pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 55 kB/s  eta 0:00:01
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 3.1 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853646 sha256=84377529f54ab41945923bd24f52d1a1bdde8f13038b2d16abbdcafd63d003b7
  Stored in directory: /home/ak/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


## PySpark documentation
https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html

Important Spark concepts:
1. **Immutable** (any change makes a new reference, but the original data is intact)
2. **Lazy evaluation** (any processing only when specifically asked)

### Import PySpark

In [4]:
import pyspark

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## Generate data

In [11]:
from datetime import datetime, date

# generate RDD (resilient distributed dataset) via parallelization
rdd = spark.sparkContext.parallelize([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])

# create spark's dataframe and add names to the columns
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e']) 
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [12]:
df.show(3)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [13]:
df.take(2)

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0))]

In [14]:
df.columns

['a', 'b', 'c', 'd', 'e']

In [16]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [17]:
df.select("a", "b", "c").describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



In [20]:
df.select("a", "b", "c").show()

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|2.0|string1|
|  2|3.0|string2|
|  3|4.0|string3|
+---+---+-------+



### Get data from file

In [15]:
df2 = spark.read.csv("music.csv") # many assumption, such as no headers etc.

df2 # this will only show the dataset schema

DataFrame[_c0: string, _c1: string, _c2: string]

In [22]:
df2 = spark \
    .read \
    .options(header = True, inferSchema = True) \
    .csv("music.csv")

df2.show() # this will show the actual data

+---+------+---------+
|age|gender|    genre|
+---+------+---------+
| 20|     1|   HipHop|
| 23|     1|   HipHop|
| 25|     1|   HipHop|
| 26|     1|     Jazz|
| 29|     1|     Jazz|
| 30|     1|     Jazz|
| 31|     1|Classical|
| 33|     1|Classical|
| 37|     1|Classical|
| 20|     0|    Dance|
| 21|     0|    Dance|
| 25|     0|    Dance|
| 26|     0| Acoustuc|
| 27|     0| Acoustuc|
| 30|     0| Acoustuc|
| 31|     0|Classical|
| 34|     0|Classical|
| 35|     0|Classical|
+---+------+---------+



In [23]:
df2.columns # the same as in Pandas

['age', 'gender', 'genre']

In [24]:
df2.dtypes # the same as in Pandas

[('age', 'int'), ('gender', 'int'), ('genre', 'string')]

Rename columns (remember: immutable!)

In [32]:
df2 \
    .withColumnRenamed('age', 'a') \
    .show(2)

+---+------+------+
|  a|gender| genre|
+---+------+------+
| 20|     1|HipHop|
| 23|     1|HipHop|
+---+------+------+
only showing top 2 rows



Filtering

In [41]:
df2[df2.genre == 'Jazz'] \
    .show()

+---+------+-----+
|age|gender|genre|
+---+------+-----+
| 26|     1| Jazz|
| 29|     1| Jazz|
| 30|     1| Jazz|
+---+------+-----+



In [44]:
df2[(df2.genre == 'Jazz') & (df2.age > 28)] \
    .show()

+---+------+-----+
|age|gender|genre|
+---+------+-----+
| 29|     1| Jazz|
| 30|     1| Jazz|
+---+------+-----+

