# DataFrame Operations

In this tutorial, we will explore PySpark DataFrames, write queries, and become more familiar with key DataFrame operations.

We will work with a synthetically generated dataset of 67,500 rows, called "noisy_neighbors_reports", which contains records of noise complaints made in NYC.

❗Note: You can watch Noam's video on a **similar** tutorial [here](https://panoptotech.cloud.panopto.eu/Panopto/Pages/Viewer.aspx?id=45909ccc-decb-4a40-95f4-afa8014bbc0f)


## **Initialization**

Installing PySpark on the Colab machine

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version

openjdk version "1.8.0_442"
OpenJDK Runtime Environment (build 1.8.0_442-8u442-b06~us1-0ubuntu1~22.04-b06)
OpenJDK 64-Bit Server VM (build 25.442-b06, mixed mode)


In [2]:
!pip install --force-reinstall pyspark==3.4.0
!pip install findspark

Collecting pyspark==3.4.0
  Using cached pyspark-3.4.0-py2.py3-none-any.whl
Collecting py4j==0.10.9.7 (from pyspark==3.4.0)
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.7
    Uninstalling py4j-0.10.9.7:
      Successfully uninstalled py4j-0.10.9.7
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.4.0
    Uninstalling pyspark-3.4.0:
      Successfully uninstalled pyspark-3.4.0
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
google-spark-connect 0.5.3 requires pyspark[connect]>=3.5, but you have pyspark 3.4.0 which is incompatible.[0m[31m
[0mSuccessfully installed py4j-0.10.9.7 pyspark-3.4.0


Initializing a Spark Session

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder\
  .appName('PySpark Data Processing')\
  .config('spark.ui.port', '4050').getOrCreate()
sc = spark.sparkContext

In [4]:
sc

## Noisy Neighbors

### Understanding the data

In [5]:
# Read a CSV into a dataframe, inferring the schema (see note).

dataPath = "./noisy_neighbors_reports.csv"
neighbors = spark.read.format("csv")\
                .option("header","true")\
                .option("inferSchema", "true")\
                .load(dataPath)

❗❗ ***.option("inferSchema", "true")*** ❗❗

inferSchema will reuslt in Spark automatically figuring out the column types, HOWEVER, this come at the cost of reading the datat more than once and also there is no guarantee that the types will be the ones we want.

In [6]:
neighbors.show(5)

+-----------------+------------+---------+-----------+--------------+----------------------+
|    Building Type|Incident Zip|  Borough|Day of Week|Date of Report|Duration of Call (min)|
+-----------------+------------+---------+-----------+--------------+----------------------+
|     Private Home|       10462|    Bronx|          1|    2023-04-24|                    39|
|            Villa|       11086|   Queens|          7|    2023-09-03|                    13|
|            Hotel|       10068|Manhattan|          3|    2023-03-22|                     8|
|            Hotel|       10466|    Bronx|          6|    2023-06-03|                    19|
|Apartment Complex|       11217| Brooklyn|          3|    2023-11-08|                    20|
+-----------------+------------+---------+-----------+--------------+----------------------+
only showing top 5 rows



In [7]:
neighbors.printSchema()

root
 |-- Building Type: string (nullable = true)
 |-- Incident Zip: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Day of Week: integer (nullable = true)
 |-- Date of Report: date (nullable = true)
 |-- Duration of Call (min): integer (nullable = true)



❗ **Reminder**: a schema is a StructType made up of a number of fields, StructFields, that have a name, type, and a Boolean flag which specifies whether that column can contain missing or null values.

In [8]:
neighbors.schema

StructType([StructField('Building Type', StringType(), True), StructField('Incident Zip', IntegerType(), True), StructField('Borough', StringType(), True), StructField('Day of Week', IntegerType(), True), StructField('Date of Report', DateType(), True), StructField('Duration of Call (min)', IntegerType(), True)])

Let's see how to manually specify a known schema for a data file, so we can skip the costly "Infer Schema":



In [9]:
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, IntegerType

schema_tut = StructType([StructField('Building Type', StringType(), True),
                     StructField('Incident Zip', DoubleType(), True),
                     StructField('Borough', StringType(), True),
                     StructField('Day of Week', IntegerType(), True),
                     StructField('Date of Report', StringType(), True),#Notice we decided to deal with the date as a String
                     StructField('Duration of Call (min)', IntegerType(), True)])


neighbors = spark.read.format("csv")\
                .option("header","true")\
                .schema(schema_tut)\
                .load(dataPath)

neighbors.printSchema()
neighbors.show(5)

root
 |-- Building Type: string (nullable = true)
 |-- Incident Zip: double (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Day of Week: integer (nullable = true)
 |-- Date of Report: string (nullable = true)
 |-- Duration of Call (min): integer (nullable = true)

+-----------------+------------+---------+-----------+--------------+----------------------+
|    Building Type|Incident Zip|  Borough|Day of Week|Date of Report|Duration of Call (min)|
+-----------------+------------+---------+-----------+--------------+----------------------+
|     Private Home|     10462.0|    Bronx|          1|    2023-04-24|                    39|
|            Villa|     11086.0|   Queens|          7|    2023-09-03|                    13|
|            Hotel|     10068.0|Manhattan|          3|    2023-03-22|                     8|
|            Hotel|     10466.0|    Bronx|          6|    2023-06-03|                    19|
|Apartment Complex|     11217.0| Brooklyn|          3|    2023-11-08| 

'Duration of Call (min)' is too long to our liking, let's change it to just 'Duration':

Introducing, ***.withColumnRenamed('original column name', 'new column name')***

In [10]:
neighbors = neighbors.withColumnRenamed('Duration of Call (min)', 'Duration')

In [11]:
neighbors.printSchema()

root
 |-- Building Type: string (nullable = true)
 |-- Incident Zip: double (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Day of Week: integer (nullable = true)
 |-- Date of Report: string (nullable = true)
 |-- Duration: integer (nullable = true)



Let's perform a simple columns selection:

In [12]:
neighbors.select(['Borough', 'Building Type']).show(5, truncate=False)

+---------+-----------------+
|Borough  |Building Type    |
+---------+-----------------+
|Bronx    |Private Home     |
|Queens   |Villa            |
|Manhattan|Hotel            |
|Bronx    |Hotel            |
|Brooklyn |Apartment Complex|
+---------+-----------------+
only showing top 5 rows



### Understanding **Transformations** and **Actions**

Let's insoect how transformations and actions work.

First, we will create a few transformations, then call an action.

These transformations are simple, first we group by the borough and then compute the average duration of a call.
Then we're going to inner join that to the original dataset on the column Borough - adding the avg value for each record.
Then we'll select the columns we want from that new dataset.

In [13]:
#Query 1: average call durations per borough
df1 = neighbors.groupBy(["Borough"])\
              .avg("Duration")\
              .withColumnRenamed("avg(Duration)","avg_dur_bor")

#Query 2: Selecting relevant columns after adding the average to each record.
df2 = df1.join(neighbors, on=["Borough"], how='inner')\
          .select(["Incident Zip","Borough","Day of Week","Duration","avg_dur_bor"])

#Ordering in an ascending order on the average duration
df1.orderBy('avg_dur_bor',ascending=True).show()

df2.show(10)

+-------------+------------------+
|      Borough|       avg_dur_bor|
+-------------+------------------+
|Staten Island| 30.37935138351681|
|    Manhattan|30.488801833099267|
|     Brooklyn|30.501617409204528|
|        Bronx|30.506580893226857|
|       Queens|30.583762405790612|
+-------------+------------------+

+------------+-------------+-----------+--------+------------------+
|Incident Zip|      Borough|Day of Week|Duration|       avg_dur_bor|
+------------+-------------+-----------+--------+------------------+
|     10462.0|        Bronx|          1|      39|30.506580893226857|
|     11086.0|       Queens|          7|      13|30.583762405790612|
|     10068.0|    Manhattan|          3|       8|30.488801833099267|
|     10466.0|        Bronx|          6|      19|30.506580893226857|
|     11217.0|     Brooklyn|          3|      20|30.501617409204528|
|     11060.0|       Queens|          5|      25|30.583762405790612|
|     10061.0|    Manhattan|          7|      52|30.48880183309

**Spark's Execution Plan**

In [14]:
df2.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Incident Zip#139, Borough#63, Day of Week#141, Duration#105, avg_dur_bor#135]
   +- BroadcastHashJoin [Borough#63], [Borough#140], Inner, BuildLeft, false
      :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [plan_id=283]
      :  +- HashAggregate(keys=[Borough#63], functions=[avg(Duration#105)])
      :     +- Exchange hashpartitioning(Borough#63, 200), ENSURE_REQUIREMENTS, [plan_id=280]
      :        +- HashAggregate(keys=[Borough#63], functions=[partial_avg(Duration#105)])
      :           +- Project [Borough#63, Duration of Call (min)#66 AS Duration#105]
      :              +- Filter isnotnull(Borough#63)
      :                 +- FileScan csv [Borough#63,Duration of Call (min)#66] Batched: false, DataFilters: [isnotnull(Borough#63)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/noisy_neighbors_reports.csv], PartitionFilters: [], PushedFilters: [IsNotNul

In [15]:
# This will execute the plan that Apache Spark built up previously.
# That is because .count() is an action!
df2.count()

67500

In [16]:
# we can convert to Pandas IF the data can fit into one node
df2.sample(0.001).toPandas()

Unnamed: 0,Incident Zip,Borough,Day of Week,Duration,avg_dur_bor
0,10221.0,Manhattan,2,14,30.488802
1,11094.0,Queens,6,50,30.583762
2,10308.0,Staten Island,4,11,30.379351
3,10466.0,Bronx,3,51,30.506581
4,11018.0,Queens,4,60,30.583762
...,...,...,...,...,...
71,10117.0,Manhattan,5,45,30.488802
72,11236.0,Brooklyn,3,58,30.501617
73,10313.0,Staten Island,3,4,30.379351
74,10313.0,Staten Island,1,34,30.379351


In [17]:
# Size of the dataframe in local RAM (Pandas!) in MB
df2.toPandas().memory_usage(deep=True).sum()/(1000*1000)

6.021007

# Check yourself
* Imagine df2 is a big table. Replace `df2.sample(0.0001).toPandas()` with `df2.toPandas()`  and run the cell.
What happens?
How much memory is needed in this PC to load the full Pandas DF?

hint: `df2.limit(100).toPandas().memory_usage()`

# Competition

Be the first to give the noisiest zip in NYC! ⏰

"noisiest" = most calls made to comaplin from that zip

In [18]:
# Your code here!

In [None]:
# @title SOLUTION - DONT PEAK BEFORE YOU TRY!
neighbors.groupBy('Incident Zip').count().orderBy('Count', ascending=False).show(3)

## More complex examples

In [20]:
import pyspark.sql.functions
from pyspark.sql.functions import*

**Task**: For calls made on weekends that lasted less than 15 minutes, calculate their noisiness score, defined as:
$score = \text{Day of Week}\cdot \frac{\text{Duration}}{\text{number of letters in the borough's name}} + \text{the number of month}$

In [21]:
noise_score = neighbors.filter((col('Day of Week').between(6, 7)) &
                (col('Duration') < 15))\
          .withColumn('num_letters_borough', length(col('Borough')))\
          .withColumn('month_number', col("Date of Report").substr(6,2).cast("int"))\
          .withColumn('score',
                      col('Day of Week') * col('Duration') / col('num_letters_borough') + col('month_number')
          )\
          .select(['Incident Zip', 'Borough', 'Day of Week', 'Duration', 'num_letters_borough', 'month_number', 'score'])

# Line 1: Keep only weekends (Saturday & Sunday)
# Line 2: Keep only short calls (< 15 min)
# Line 3: Count letters in Borough name
# Line 4: Extract MM from YYYY-MM-DD then cast to integer
# Line 5: Compute noisiness score
# Line 6: Select relevant columns

# Sort results by score in descending order and show top 10
noise_score.orderBy('score', ascending=False).show(10)


+------------+-------+-----------+--------+-------------------+------------+-----+
|Incident Zip|Borough|Day of Week|Duration|num_letters_borough|month_number|score|
+------------+-------+-----------+--------+-------------------+------------+-----+
|     10473.0|  Bronx|          7|      14|                  5|          11| 30.6|
|     10456.0|  Bronx|          7|      14|                  5|          11| 30.6|
|     10456.0|  Bronx|          7|      14|                  5|          11| 30.6|
|     10465.0|  Bronx|          7|      14|                  5|          11| 30.6|
|     10456.0|  Bronx|          7|      13|                  5|          12| 30.2|
|     10453.0|  Bronx|          7|      13|                  5|          12| 30.2|
|     10468.0|  Bronx|          7|      13|                  5|          12| 30.2|
|     10456.0|  Bronx|          7|      14|                  5|          10| 29.6|
|     10473.0|  Bronx|          7|      14|                  5|          10| 29.6|
|   

### Merging zip codes and boroughs into one column and calculating their overall score

In [22]:
# Understand what we're working with
noise_score.printSchema()

root
 |-- Incident Zip: double (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Day of Week: integer (nullable = true)
 |-- Duration: integer (nullable = true)
 |-- num_letters_borough: integer (nullable = true)
 |-- month_number: integer (nullable = true)
 |-- score: double (nullable = true)



In [23]:
merged_noise_score = noise_score.\
        rdd.\
merged_noise_score = noise_score.\
        rdd.\
        map(lambda row: ((row['Incident Zip'], row['Borough']), row['score']))\
        .reduceByKey(lambda x, y: x+y).toDF().\
        select(col('_1').alias('zip and borough'), col('_2').alias('score'))

#Notice that we had to switch back to RDDs to work with MapReduce.

merged_noise_score.show(10)

+--------------------+------------------+
|     zip and borough|             score|
+--------------------+------------------+
|   {11086.0, Queens}|145.16666666666666|
| {11243.0, Brooklyn}|           228.875|
|{10313.0, Staten ...| 592.2307692307694|
| {11204.0, Brooklyn}|            228.75|
|    {10463.0, Bronx}| 633.4000000000001|
|{10215.0, Manhattan}| 56.33333333333334|
|    {10461.0, Bronx}| 631.9999999999999|
| {11216.0, Brooklyn}|             223.5|
|{10306.0, Staten ...| 686.8461538461539|
|{10139.0, Manhattan}|              41.0|
+--------------------+------------------+
only showing top 10 rows



**Note**: the same thing could have been done using only DataFrame operations by using .groupBy() and .agg(), tehn structing a new column with the ziip and borough.

# **More DataFrame Operations**

***when()***: for condiitonal column values - assigining new values based on a condition

In [24]:
noise_level = neighbors.withColumn("Noise Level", when(col("Duration") > 30, "High").otherwise("Low"))

noise_level.select(['Borough','Incident Zip','Day of Week','Duration','Noise Level']).show(10)

+-------------+------------+-----------+--------+-----------+
|      Borough|Incident Zip|Day of Week|Duration|Noise Level|
+-------------+------------+-----------+--------+-----------+
|        Bronx|     10462.0|          1|      39|       High|
|       Queens|     11086.0|          7|      13|        Low|
|    Manhattan|     10068.0|          3|       8|        Low|
|        Bronx|     10466.0|          6|      19|        Low|
|     Brooklyn|     11217.0|          3|      20|        Low|
|       Queens|     11060.0|          5|      25|        Low|
|    Manhattan|     10061.0|          7|      52|       High|
|       Queens|     11029.0|          1|      60|       High|
|Staten Island|     10302.0|          1|      57|       High|
|    Manhattan|     10050.0|          1|      31|       High|
+-------------+------------+-----------+--------+-----------+
only showing top 10 rows



***between(a,b)***: range filterring

In [25]:
neighbors.filter(col("Duration").between(10, 30)).show(5)


+-----------------+------------+-------------+-----------+--------------+--------+
|    Building Type|Incident Zip|      Borough|Day of Week|Date of Report|Duration|
+-----------------+------------+-------------+-----------+--------------+--------+
|            Villa|     11086.0|       Queens|          7|    2023-09-03|      13|
|            Hotel|     10466.0|        Bronx|          6|    2023-06-03|      19|
|Apartment Complex|     11217.0|     Brooklyn|          3|    2023-11-08|      20|
|     Private Home|     11060.0|       Queens|          5|    2023-10-27|      25|
|     Private Home|     10306.0|Staten Island|          7|    2023-05-14|      21|
+-----------------+------------+-------------+-----------+--------------+--------+
only showing top 5 rows



***.groupBy()*** is to group according to a key, ***.agg()*** is to perform operations on the grouped values

***.sum()*** to sum aggregated values

***.avg()*** to calculate their average

***.count()***  to count the number of elements in the aggregated values

In [26]:
neighbors.groupBy("Borough").agg(
    sum("Duration").alias("Total Duration"),
    avg("Duration").alias("Avg Duration"),
    count("*").alias("Total Complaints")
).show()


+-------------+--------------+------------------+----------------+
|      Borough|Total Duration|      Avg Duration|Total Complaints|
+-------------+--------------+------------------+----------------+
|       Queens|        409853|30.583762405790612|           13401|
|     Brooklyn|        414883|30.501617409204528|           13602|
|Staten Island|        408420| 30.37935138351681|           13444|
|    Manhattan|        412483|30.488801833099267|           13529|
|        Bronx|        412571|30.506580893226857|           13524|
+-------------+--------------+------------------+----------------+



***.min()*** - minimal value

***.max()*** - maximal value

In [27]:
neighbors.groupBy("Borough").agg(
    min("Duration").alias("Min Duration"),
    max("Duration").alias("Max Duration")
).show()


+-------------+------------+------------+
|      Borough|Min Duration|Max Duration|
+-------------+------------+------------+
|       Queens|           1|          60|
|     Brooklyn|           1|          60|
|Staten Island|           1|          60|
|    Manhattan|           1|          60|
|        Bronx|           1|          60|
+-------------+------------+------------+



(Exposing the fact that the generated durations come from a uniform distribution between 1 and 60 lol)

***.explode()*** to expand array column into multiple rows

***.drop()*** to drop a column or a subset of columns

In [28]:
# Creating a datafram with an array as one of its columns
data = [
    ("student1", [95, 98, 100], 'Data Sceince'),
    ("student2", [85, 90], 'Civil Engineering'),
    ("student3", [75, 60, 55], 'Computer Science')
]

df = spark.createDataFrame(data, ["student ID", "Grades", "Major"])
df = df.withColumn("grade", explode(col('Grades'))).drop('Grades', 'Major')
df.show()


+----------+-----+
|student ID|grade|
+----------+-----+
|  student1|   95|
|  student1|   98|
|  student1|  100|
|  student2|   85|
|  student2|   90|
|  student3|   75|
|  student3|   60|
|  student3|   55|
+----------+-----+

