# PySpark

## Installation (using `conda`)

```sh
$ conda create --name pyspark_env
$ conda activate pyspark_env
$ conda install --channel conda-forge pyspark python
$ pip install jupyterlab pandas
$ jupyter lab
```

###### References:
* https://spark.apache.org/docs/latest/api/python/getting_started/install.html

## Initialization of SparkSession

PySpark applications start with initializing SparkSession which is the entry point of PySpark as below.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

24/08/04 00:05:59 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 172.18.0.1 instead (on interface br-6287b9e8d433)
24/08/04 00:05:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/04 00:06:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/04 00:06:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


###### References:
* https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df

## DataFrame Creation

In [24]:
from pyspark.sql.functions import upper
from pyspark.sql import Row, Column
from datetime import date, datetime
import pandas as pd

In [3]:
# Create a PySpark DataFrame from a list of rows.
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [4]:
# Create a PySpark DataFrame with an explicit schema.
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [5]:
# Create a PySpark DataFrame from a pandas DataFrame.
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [12]:
# All DataFrames above result same.
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [13]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [7]:
df.show(1, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
only showing top 1 row



In [11]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


In [14]:
df.columns

['a', 'b', 'c', 'd', 'e']

In [17]:
df.select('a', 'b', 'c').describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   NULL|
| stddev|1.0|1.0|   NULL|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



In [19]:
df.toPandas()

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,3,4.0,string3,2000-03-01,2000-01-03 12:00:00


In [26]:
df.withColumn('upper_c', upper(df.c)).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



In [28]:
df.filter(df.a == 1).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



In [32]:
df.write.csv('data/output/pyspark/explample.csv', header=True, mode='overwrite')
spark.read.csv('data/output/pyspark/explample.csv', header=True).show()

+---+---+-------+----------+--------------------+
|  a|  b|      c|         d|                   e|
+---+---+-------+----------+--------------------+
|  1|2.0|string1|2000-01-01|2000-01-01T12:00:...|
|  3|4.0|string3|2000-03-01|2000-01-03T12:00:...|
|  2|3.0|string2|2000-02-01|2000-01-02T12:00:...|
+---+---+-------+----------+--------------------+



In [33]:
df.write.parquet('data/output/pyspark/explample.parquet', mode='overwrite')
spark.read.parquet('data/output/pyspark/explample.parquet').show()

                                                                                

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



In [36]:
# Working with SQL
df.createOrReplaceTempView('tableA')
spark.sql('select count(*) as cuenta from tableA').show()

+------+
|cuenta|
+------+
|     3|
+------+

