# Spark DataFrame
* Tabular data
  * Rows 
  * Named Columns
* Immutable 
* Distribute collection of data
* Lazy
* Can process structured and semi-structured data
  * relational database
  * csv
  * json
  * txt
  * RDD
  * dict
  * list
  * etc
* Support SQL or expression methods
  * SELECT * FROM RedWine
  * red_wine_df.select()
* Schema
  * Information about
    * column name
    * data type
    * empty values
    * etc
  * Help to optimize the queries

In [3]:
from pyspark import SparkContext 

from pyspark.sql import SparkSession

master = 'spark://192.168.2.102:7077' # Connect to remote server
appName = 'Create DataFrame'

# RDD
sc = SparkContext(master=master, appName=appName)

# DataFrame
spark = SparkSession.builder.appName(appName).master(master).getOrCreate()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Create DataFrame, master=spark://192.168.2.102:7077) created by __init__ at /tmp/ipykernel_1898442/1114236199.py:9 

In [4]:
df = spark.createDataFrame([
    {'Id': 1, 'Value': 1},
    {'Id': 1, 'Value': 2},
    {'Id': 2, 'Value': 3},
    {'Id': 2, 'Value': 4},
])
df.show()

21/12/22 12:18:26 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources



### Create DataFrame from RDD

In [2]:
red_wine_rdd = sc.parallelize([
    [7.4, 0.7, 0.0, 5],
    [7.8, 0.88, 0.0, 5],
    [7.8, 0.76, 0.04, 5],
    [11.2, 0.28, 0.56, 6],
])

red_wine_rdd.take(5)

                                                                                

[[7.4, 0.7, 0.0, 5],
 [7.8, 0.88, 0.0, 5],
 [7.8, 0.76, 0.04, 5],
 [11.2, 0.28, 0.56, 6]]

In [3]:
columns = ['fixed acidity', 'volatile acidity', 'citric acid', 'quality']

red_wine_df = spark.createDataFrame(red_wine_rdd, schema=columns)

red_wine_df.show()

                                                                                

+-------------+----------------+-----------+-------+
|fixed acidity|volatile acidity|citric acid|quality|
+-------------+----------------+-----------+-------+
|          7.4|             0.7|        0.0|      5|
|          7.8|            0.88|        0.0|      5|
|          7.8|            0.76|       0.04|      5|
|         11.2|            0.28|       0.56|      6|
+-------------+----------------+-----------+-------+



In [4]:
red_wine_df.dtypes

[('fixed acidity', 'double'),
 ('volatile acidity', 'double'),
 ('citric acid', 'double'),
 ('quality', 'bigint')]

### Create DataFrame from csv

In [7]:
root_path = 'hdfs://192.168.2.102:9000/dataset/{filename}'

# By Default inferSchema is False
red_wine_df = spark.read.csv(root_path.format(filename='winequality-red.csv'), header=True, inferSchema=True)

red_wine_df.show(5)

                                                                                

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

In [8]:
red_wine_df.dtypes

[('fixed acidity', 'double'),
 ('volatile acidity', 'double'),
 ('citric acid', 'double'),
 ('residual sugar', 'double'),
 ('chlorides', 'double'),
 ('free sulfur dioxide', 'double'),
 ('total sulfur dioxide', 'double'),
 ('density', 'double'),
 ('pH', 'double'),
 ('sulphates', 'double'),
 ('alcohol', 'double'),
 ('quality', 'int')]

### Create DataFrame from txt

In [9]:
temp_hist = spark.read.text('data/beer_temp_hist.txt')
temp_hist.show(5)

+-----------------+
|            value|
+-----------------+
|2021-12-01;1;20.0|
|2021-12-02;1;20.2|
|    2021-12-03;1;|
|2021-12-04;1;20.3|
|2021-12-05;1;20.5|
+-----------------+
only showing top 5 rows



In [10]:
rdd = sc.textFile('data/beer_temp_hist.txt')
rdd.take(5)

['2021-12-01;1;20.0',
 '2021-12-02;1;20.2',
 '2021-12-03;1;',
 '2021-12-04;1;20.3',
 '2021-12-05;1;20.5']

In [11]:
splitted_rows = rdd.map(lambda row: row.split(';'))
temp_hist = spark.createDataFrame(splitted_rows, schema=['Date', 'BeerId', 'Temp'])
temp_hist.show()

+----------+------+----+
|      Date|BeerId|Temp|
+----------+------+----+
|2021-12-01|     1|20.0|
|2021-12-02|     1|20.2|
|2021-12-03|     1|    |
|2021-12-04|     1|20.3|
|2021-12-05|     1|20.5|
|2021-12-01|     2|16.5|
|2021-12-02|     2|16.4|
|2021-12-03|     2|16.5|
|2021-12-04|     2|    |
|2021-12-05|     2|16.8|
|2021-12-05|     2|16.7|
|2021-12-01|     3|18.3|
|2021-12-02|     3|18.4|
|2021-12-03|     3|    |
|2021-12-01|     4|18.2|
+----------+------+----+



In [12]:
temp_hist.dtypes

[('Date', 'string'), ('BeerId', 'string'), ('Temp', 'string')]

### Infer data type

In [13]:
red_wine_df = spark.read.csv(root_path.format(filename='winequality-red.csv'), header=True, inferSchema=True)
red_wine_df.dtypes

[('fixed acidity', 'double'),
 ('volatile acidity', 'double'),
 ('citric acid', 'double'),
 ('residual sugar', 'double'),
 ('chlorides', 'double'),
 ('free sulfur dioxide', 'double'),
 ('total sulfur dioxide', 'double'),
 ('density', 'double'),
 ('pH', 'double'),
 ('sulphates', 'double'),
 ('alcohol', 'double'),
 ('quality', 'int')]

### Schema

In [16]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, FloatType, DateType, StringType

In [17]:
temp_hist_schema = StructType([
    StructField('Date', DateType()),
    StructField('BeerId', IntegerType()),
    StructField('Temp', FloatType()),
])

In [18]:

temp_hist_df = spark.read.csv('data/beer_temp_hist.txt', sep=';', schema=temp_hist_schema)
temp_hist_df.show()

+----------+------+----+
|      Date|BeerId|Temp|
+----------+------+----+
|2021-12-01|     1|20.0|
|2021-12-02|     1|20.2|
|2021-12-03|     1|null|
|2021-12-04|     1|20.3|
|2021-12-05|     1|20.5|
|2021-12-01|     2|16.5|
|2021-12-02|     2|16.4|
|2021-12-03|     2|16.5|
|2021-12-04|     2|null|
|2021-12-05|     2|16.8|
|2021-12-05|     2|16.7|
|2021-12-01|     3|18.3|
|2021-12-02|     3|18.4|
|2021-12-03|     3|null|
|2021-12-01|     4|18.2|
+----------+------+----+



In [19]:
temp_hist_df.dtypes

[('Date', 'date'), ('BeerId', 'int'), ('Temp', 'float')]

In [20]:
temp_hist_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- BeerId: integer (nullable = true)
 |-- Temp: float (nullable = true)



<hr/>

### Merge DataFrames

In [29]:
from pyspark.sql import functions as F

beer_df = beer_df.withColumn('ElapsedDays', F.datediff(beer_df.Date, beer_df.InitialDate))

beer_df.show()

+---+-----------+--------+----------+----+------------------+-----------+
| Id|InitialDate|    Type|      Date|   C|                 F|ElapsedDays|
+---+-----------+--------+----------+----+------------------+-----------+
|  1| 2021-12-01|   Laget|2021-12-05|20.5|              68.9|          4|
|  1| 2021-12-01|   Laget|2021-12-04|20.3| 68.53999862670898|          3|
|  1| 2021-12-01|   Laget|2021-12-03|null|              null|          2|
|  1| 2021-12-01|   Laget|2021-12-02|20.2| 68.36000137329103|          1|
|  1| 2021-12-01|   Laget|2021-12-01|20.0|              68.0|          0|
|  2| 2021-12-01|Pale Ale|2021-12-05|16.7|62.060001373291016|          4|
|  2| 2021-12-01|Pale Ale|2021-12-05|16.8| 62.23999862670898|          4|
|  2| 2021-12-01|Pale Ale|2021-12-04|null|              null|          3|
|  2| 2021-12-01|Pale Ale|2021-12-03|16.5|              61.7|          2|
|  2| 2021-12-01|Pale Ale|2021-12-02|16.4| 61.51999931335449|          1|
|  2| 2021-12-01|Pale Ale|2021-12-01|1

In [30]:
beer_df = beer_df.withColumn('FirstDay', beer_df.ElapsedDays == 0)
beer_df.show()

+---+-----------+--------+----------+----+------------------+-----------+--------+
| Id|InitialDate|    Type|      Date|   C|                 F|ElapsedDays|FirstDay|
+---+-----------+--------+----------+----+------------------+-----------+--------+
|  1| 2021-12-01|   Laget|2021-12-05|20.5|              68.9|          4|   false|
|  1| 2021-12-01|   Laget|2021-12-04|20.3| 68.53999862670898|          3|   false|
|  1| 2021-12-01|   Laget|2021-12-03|null|              null|          2|   false|
|  1| 2021-12-01|   Laget|2021-12-02|20.2| 68.36000137329103|          1|   false|
|  1| 2021-12-01|   Laget|2021-12-01|20.0|              68.0|          0|    true|
|  2| 2021-12-01|Pale Ale|2021-12-05|16.7|62.060001373291016|          4|   false|
|  2| 2021-12-01|Pale Ale|2021-12-05|16.8| 62.23999862670898|          4|   false|
|  2| 2021-12-01|Pale Ale|2021-12-04|null|              null|          3|   false|
|  2| 2021-12-01|Pale Ale|2021-12-03|16.5|              61.7|          2|   false|
|  2



#### Select and filter

In [31]:
bear_id = beer_df.select('Id')
bear_id.show(2)

+---+
| Id|
+---+
|  1|
|  1|
+---+
only showing top 2 rows



In [32]:
beer_2 = beer_df.filter(beer_df.Id == 2)
beer_2.show()

+---+-----------+--------+----------+----+------------------+-----------+--------+
| Id|InitialDate|    Type|      Date|   C|                 F|ElapsedDays|FirstDay|
+---+-----------+--------+----------+----+------------------+-----------+--------+
|  2| 2021-12-01|Pale Ale|2021-12-05|16.7|62.060001373291016|          4|   false|
|  2| 2021-12-01|Pale Ale|2021-12-05|16.8| 62.23999862670898|          4|   false|
|  2| 2021-12-01|Pale Ale|2021-12-04|null|              null|          3|   false|
|  2| 2021-12-01|Pale Ale|2021-12-03|16.5|              61.7|          2|   false|
|  2| 2021-12-01|Pale Ale|2021-12-02|16.4| 61.51999931335449|          1|   false|
|  2| 2021-12-01|Pale Ale|2021-12-01|16.5|              61.7|          0|    true|
+---+-----------+--------+----------+----+------------------+-----------+--------+



### Fill Missing values

In [33]:
beer_df.show()

+---+-----------+--------+----------+----+------------------+-----------+--------+
| Id|InitialDate|    Type|      Date|   C|                 F|ElapsedDays|FirstDay|
+---+-----------+--------+----------+----+------------------+-----------+--------+
|  1| 2021-12-01|   Laget|2021-12-05|20.5|              68.9|          4|   false|
|  1| 2021-12-01|   Laget|2021-12-04|20.3| 68.53999862670898|          3|   false|
|  1| 2021-12-01|   Laget|2021-12-03|null|              null|          2|   false|
|  1| 2021-12-01|   Laget|2021-12-02|20.2| 68.36000137329103|          1|   false|
|  1| 2021-12-01|   Laget|2021-12-01|20.0|              68.0|          0|    true|
|  2| 2021-12-01|Pale Ale|2021-12-05|16.7|62.060001373291016|          4|   false|
|  2| 2021-12-01|Pale Ale|2021-12-05|16.8| 62.23999862670898|          4|   false|
|  2| 2021-12-01|Pale Ale|2021-12-04|null|              null|          3|   false|
|  2| 2021-12-01|Pale Ale|2021-12-03|16.5|              61.7|          2|   false|
|  2

In [34]:
beer_df = beer_df.fillna('Unknown', subset=['Type'])
beer_df.show()

+---+-----------+--------+----------+----+------------------+-----------+--------+
| Id|InitialDate|    Type|      Date|   C|                 F|ElapsedDays|FirstDay|
+---+-----------+--------+----------+----+------------------+-----------+--------+
|  1| 2021-12-01|   Laget|2021-12-05|20.5|              68.9|          4|   false|
|  1| 2021-12-01|   Laget|2021-12-04|20.3| 68.53999862670898|          3|   false|
|  1| 2021-12-01|   Laget|2021-12-03|null|              null|          2|   false|
|  1| 2021-12-01|   Laget|2021-12-02|20.2| 68.36000137329103|          1|   false|
|  1| 2021-12-01|   Laget|2021-12-01|20.0|              68.0|          0|    true|
|  2| 2021-12-01|Pale Ale|2021-12-05|16.7|62.060001373291016|          4|   false|
|  2| 2021-12-01|Pale Ale|2021-12-05|16.8| 62.23999862670898|          4|   false|
|  2| 2021-12-01|Pale Ale|2021-12-04|null|              null|          3|   false|
|  2| 2021-12-01|Pale Ale|2021-12-03|16.5|              61.7|          2|   false|
|  2

In [35]:
mean = beer_df.agg(F.mean('C')).collect()[0].asDict()['avg(C)']
print(mean)
beer_df.fillna(mean, 'C').show()

                                                                                

18.233333269755047
+---+-----------+--------+----------+---------+------------------+-----------+--------+
| Id|InitialDate|    Type|      Date|        C|                 F|ElapsedDays|FirstDay|
+---+-----------+--------+----------+---------+------------------+-----------+--------+
|  1| 2021-12-01|   Laget|2021-12-05|     20.5|              68.9|          4|   false|
|  1| 2021-12-01|   Laget|2021-12-04|     20.3| 68.53999862670898|          3|   false|
|  1| 2021-12-01|   Laget|2021-12-03|18.233334|              null|          2|   false|
|  1| 2021-12-01|   Laget|2021-12-02|     20.2| 68.36000137329103|          1|   false|
|  1| 2021-12-01|   Laget|2021-12-01|     20.0|              68.0|          0|    true|
|  2| 2021-12-01|Pale Ale|2021-12-05|     16.7|62.060001373291016|          4|   false|
|  2| 2021-12-01|Pale Ale|2021-12-05|     16.8| 62.23999862670898|          4|   false|
|  2| 2021-12-01|Pale Ale|2021-12-04|18.233334|              null|          3|   false|
|  2| 2021-12

In [37]:
from pyspark.sql.window import Window

window = Window\
    .partitionBy('Id')\
    .orderBy('Date')\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

filled_cols = F.last(beer_df.F, ignorenulls=True).over(window)
beer_df = beer_df.withColumn('F[Filled]', filled_cols)

beer_df.select(['Id', 'Date', 'F', 'F[Filled]']).orderBy(['Id', 'Date']).show()



+---+----------+------------------+------------------+
| Id|      Date|                 F|         F[Filled]|
+---+----------+------------------+------------------+
|  1|2021-12-01|              68.0|              68.0|
|  1|2021-12-02| 68.36000137329103| 68.36000137329103|
|  1|2021-12-03|              null| 68.36000137329103|
|  1|2021-12-04| 68.53999862670898| 68.53999862670898|
|  1|2021-12-05|              68.9|              68.9|
|  2|2021-12-01|              61.7|              61.7|
|  2|2021-12-02| 61.51999931335449| 61.51999931335449|
|  2|2021-12-03|              61.7|              61.7|
|  2|2021-12-04|              null|              61.7|
|  2|2021-12-05|62.060001373291016|62.060001373291016|
|  2|2021-12-05| 62.23999862670898| 62.23999862670898|
|  3|2021-12-01| 64.93999862670898| 64.93999862670898|
|  3|2021-12-02|  65.1199993133545|  65.1199993133545|
|  3|2021-12-03|              null|  65.1199993133545|
|  4|2021-12-01| 64.76000137329102| 64.76000137329102|
+---+-----

                                                                                

In [38]:
window = Window\
    .partitionBy('Id')\
    .orderBy('Date')\
    .rowsBetween(Window.currentRow, Window.unboundedFollowing)

filled_cols = F.first(beer_df.F, ignorenulls=True).over(window)
beer_df = beer_df.withColumn('F[Filled]', filled_cols)

beer_df.select(['Id', 'Date', 'F', 'F[Filled]']).orderBy(['Id', 'Date']).show()



+---+----------+------------------+------------------+
| Id|      Date|                 F|         F[Filled]|
+---+----------+------------------+------------------+
|  1|2021-12-01|              68.0|              68.0|
|  1|2021-12-02| 68.36000137329103| 68.36000137329103|
|  1|2021-12-03|              null| 68.53999862670898|
|  1|2021-12-04| 68.53999862670898| 68.53999862670898|
|  1|2021-12-05|              68.9|              68.9|
|  2|2021-12-01|              61.7|              61.7|
|  2|2021-12-02| 61.51999931335449| 61.51999931335449|
|  2|2021-12-03|              61.7|              61.7|
|  2|2021-12-04|              null|62.060001373291016|
|  2|2021-12-05|62.060001373291016|62.060001373291016|
|  2|2021-12-05| 62.23999862670898| 62.23999862670898|
|  3|2021-12-01| 64.93999862670898| 64.93999862670898|
|  3|2021-12-02|  65.1199993133545|  65.1199993133545|
|  3|2021-12-03|              null|              null|
|  4|2021-12-01| 64.76000137329102| 64.76000137329102|
+---+-----

                                                                                

In [None]:
cols = []

for i in range(1, 3):
    new_col = f'F_-{i}'
    cols.append(new_col)

    window = Window\
        .partitionBy('Id')\
        .orderBy('Date')\
        .rowsBetween(Window.currentRow -i, Window.currentRow -i)

    lag_col = F.first(beer_df['F[Filled]'], ignorenulls=True).over(window)
    beer_df = beer_df.withColumn(new_col, lag_col)

beer_df.select(['Id', 'Date', 'F[Filled]', *cols]).orderBy(['Id', 'Date']).show()

#### GroupBy Operations

In [None]:
beer_df_grouped = beer_df.groupBy('Id')
print(beer_df_grouped.count().show())
print(beer_df_grouped.min('C').show())
print(beer_df_grouped.max('C').show())


In [None]:
beer_df.groupBy('Id').agg(
    F.count('Id'), 
    F.min('C'), 
    F.max('C')
).show()


#### Other Operations

In [None]:
beer_df.orderBy('C', ascending=False).show()

In [None]:
beer_df.orderBy(['Id', 'C'], ascending=[False, True]).show()

In [None]:
beer_df.dropDuplicates(['Id']).show()

In [None]:
beer_df.describe().show()

# SQL Queries

In [None]:
beer_schema = StructType([
    StructField('Id', IntegerType()),
    StructField('InitialDate', DateType()),
    StructField('Type', StringType()),
])

temp_hist_schema = StructType([
    StructField('Date', DateType()),
    StructField('Id', IntegerType()),
    StructField('Temp', FloatType()),
])

beer_df = spark.read.csv('data/beer.csv', schema=beer_schema, header=True)
beer_df.createOrReplaceTempView('Beer')


temp_hist_df = spark.read.csv('data/beer_temp_hist.txt', sep=';', schema=temp_hist_schema)
temp_hist_df.createOrReplaceTempView('TempHist')

In [None]:
beer_hist_df = temp_hist_df.join(beer_df, on='Id')
print(beer_hist_df.show(5))

beer_hist_df = spark.sql("""
SELECT 
    TempHist.Id,
    TempHist.Date,
    TempHist.Temp,
    Beer.InitialDate,
    Beer.Type
FROM TempHist
INNER JOIN Beer ON 
    Beer.Id = TempHist.Id
""")

print(beer_hist_df.show(5))

beer_hist_df.createOrReplaceTempView('BeerHist')


In [None]:
beer_df = spark.sql('SELECT * FROM BeerHist')
beer_df.collect()

In [None]:
bear_id = beer_hist_df.select('Id')
print(bear_id.show(2))

ear_id = spark.sql('SELECT Id FROM BeerHist')
print(bear_id.show(2))

In [None]:
beer_2 = beer_hist_df.filter(beer_hist_df.Id == 2).orderBy('Temp', ascending=False)
print(beer_2.show())

beer_2 = spark.sql('SELECT * FROM BeerHist WHERE Id = 2 order by Temp desc')
print(beer_2.show())

In [None]:
print(
    beer_hist_df.groupBy('Id').agg(
        F.min('Temp'), F.max('Temp')
    ).show()
)

print(
    spark.sql('SELECT Id, MIN(Temp), max(Temp) FROM BeerHist GROUP BY id').show()

)
