In [1]:
import pyspark
from pyspark.sql import SparkSession 

### Inspired by pandas DataFrames in structure, format, and a few specific operations,
#### Spark DataFrames are like distributed in-memory tables with named columns and schemas, where each column has a specific data type: integer, string, array, map, real,date, timestamp, etc.

In [2]:
spark = SparkSession.builder.getOrCreate()

### Dealing with missing data with pyspark
#### Missing Data
1. Keep them.
2. Remove them.
3. Fill them with some values.

In [3]:
df = spark.read.csv('NullData.csv', header=True, inferSchema=True)

In [4]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [5]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [6]:
## How to deal with Missin Values in Spark?
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



#### If specified, drop rows that have less than `thresh` non-null values.
#### This overwrites the `how` parameter.

In [7]:

df.na.drop(thresh=1).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [8]:
## thresh=2 means that the row that has at least two values in it
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [9]:
## thresh=3 means that the row that has at least 3 values in it

df.na.drop(thresh=3).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [10]:
df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [11]:
df.na.drop(subset=['Name','Sales'], thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [12]:
## how to fill the null values in Spark?

df.na.fill('No Name').show()
## it went to the string column that has nulls automatically by spark

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| NULL|
|emp2|No Name| NULL|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [13]:
df.na.fill(25).show()
## it went to the integer column that has nulls automatically by spark


+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| 25.0|
|emp2| NULL| 25.0|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [14]:
df.na.fill(25, subset=['Sales']).show()


+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| 25.0|
|emp2| NULL| 25.0|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### DataFrame Operations

In Python, it’s possible to access a DataFrame’s columns either by attribute <b>(df.age)</b> or by indexing <b>(df['age'])</b>. While the former is convenient for interactive data exploration, users are highly encouraged to use the latter form, which is future proof and won’t break with column names that are also attributes on the DataFrame class.

In [15]:
from pyspark.sql.functions import mean 
mean_value = df.select(mean(df['Sales']).alias('SalesMean')).collect()[0].SalesMean


##mean_value = df.select(mean(df['Sales']).alias('SalesMean')).collect()[0][0]


In [16]:
mean_value

400.5

In [17]:
df.na.fill(mean_value, subset= ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| NULL|400.5|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [18]:
## Sometime in machine learning, you want to transform subset of Spark DF to Pandas DF 
##to make some feature engineering, here is how to do it 
df_toPandas = df.toPandas()

In [19]:
df_toPandas.head()

Unnamed: 0,Id,Name,Sales
0,emp1,John,
1,emp2,,
2,emp3,,345.0
3,emp4,Cindy,456.0


## Schemas and Creating DataFrames

A schema in Spark defines the column names and associated data types for a DataFrame. Most often, schemas come into play when you are reading structured data
from an external data source Defining a schema
up front as opposed to taking a schema-on-read approach offers three benefits:
<b>
1. You relieve Spark from the onus of inferring data types.
2. You prevent Spark from creating a separate job just to read a large portion of your file to ascertain the schema, which for a large data file can be expensive and time-consuming.
3. You can detect errors early if data doesn’t match the schema.
</b>

<i>So, it is encouraged to always define your schema up front whenever you want to
read a large file from a data source.</i>

In [20]:
df_fire = spark.read.csv('sf-fire-calls.csv', header=True,inferSchema=True)

In [21]:
df_fire.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

#### we are working with BigData, to tell Spark to infer the schema with huge dataset
#### it will go to read all the dataset and this may be too much load and processing for Spark
#### to solve that, use samplingRatio=0.001 feature, to take only small sample to infer the schema

In [22]:

df_fire_sample = spark.read.csv('sf-fire-calls.csv', header=True,inferSchema=True, samplingRatio=0.001)

In [23]:
df_fire_sample.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: integer (nullable = true)
 |-- Box: integer (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)

In [24]:
df_fire_sample.schema

StructType([StructField('CallNumber', IntegerType(), True), StructField('UnitID', StringType(), True), StructField('IncidentNumber', IntegerType(), True), StructField('CallType', StringType(), True), StructField('CallDate', StringType(), True), StructField('WatchDate', StringType(), True), StructField('CallFinalDisposition', StringType(), True), StructField('AvailableDtTm', StringType(), True), StructField('Address', StringType(), True), StructField('City', StringType(), True), StructField('Zipcode', IntegerType(), True), StructField('Battalion', StringType(), True), StructField('StationArea', IntegerType(), True), StructField('Box', IntegerType(), True), StructField('OriginalPriority', StringType(), True), StructField('Priority', StringType(), True), StructField('FinalPriority', IntegerType(), True), StructField('ALSUnit', BooleanType(), True), StructField('CallTypeGroup', StringType(), True), StructField('NumAlarms', IntegerType(), True), StructField('UnitType', StringType(), True), 

#### Now you have the schema from the sample, you can take it and pass to the original dataset

In [25]:
df_fire = spark.read.csv('sf-fire-calls.csv', header=True,schema=df_fire_sample.schema)

In [26]:
df_fire.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: integer (nullable = true)
 |-- Box: integer (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)

In [27]:
from pyspark.sql.functions import *
import pyspark.sql.functions as fn

In [28]:
df_Medical = df_fire.select('IncidentNumber', 'AvailableDtTm', 'CallType')\
        .where(col('CallType')=='Medical Incident') 

In [29]:
df_Medical.show(5, truncate=False)

+--------------+----------------------+----------------+
|IncidentNumber|AvailableDtTm         |CallType        |
+--------------+----------------------+----------------+
|2003241       |01/11/2002 03:01:18 AM|Medical Incident|
|2003242       |01/11/2002 02:39:50 AM|Medical Incident|
|2003343       |01/11/2002 12:06:57 PM|Medical Incident|
|2003348       |01/11/2002 01:08:40 PM|Medical Incident|
|2003381       |01/11/2002 03:31:02 PM|Medical Incident|
+--------------+----------------------+----------------+
only showing top 5 rows



In [30]:
df_MedicalNotNull = df_fire.select('IncidentNumber', 'AvailableDtTm', 'CallType')\
        .where(col('CallType').isNotNull()) 

In [31]:
df_MedicalNotNull.show(30)

+--------------+--------------------+--------------------+
|IncidentNumber|       AvailableDtTm|            CallType|
+--------------+--------------------+--------------------+
|       2003235|01/11/2002 01:51:...|      Structure Fire|
|       2003241|01/11/2002 03:01:...|    Medical Incident|
|       2003242|01/11/2002 02:39:...|    Medical Incident|
|       2003250|01/11/2002 04:16:...|        Vehicle Fire|
|       2003259|01/11/2002 06:01:...|              Alarms|
|       2003279|01/11/2002 08:03:...|      Structure Fire|
|       2003301|01/11/2002 09:46:...|              Alarms|
|       2003304|01/11/2002 09:58:...|              Alarms|
|       2003343|01/11/2002 12:06:...|    Medical Incident|
|       2003348|01/11/2002 01:08:...|    Medical Incident|
|       2003381|01/11/2002 03:31:...|    Medical Incident|
|       2003382|01/11/2002 02:59:...|      Structure Fire|
|       2003399|01/11/2002 04:22:...|    Medical Incident|
|       2003403|01/11/2002 04:18:...|    Medical Inciden

In [32]:
df_CallDistinct = df_fire.select('CallType')\
        .where(col('CallType').isNotNull()).distinct()

In [33]:
df_CallDistinct.show()

+--------------------+
|            CallType|
+--------------------+
|Elevator / Escala...|
|         Marine Fire|
|  Aircraft Emergency|
|      Administrative|
|              Alarms|
|Odor (Strange / U...|
|Citizen Assist / ...|
|              HazMat|
|Watercraft in Dis...|
|           Explosion|
|           Oil Spill|
|        Vehicle Fire|
|  Suspicious Package|
|Extrication / Ent...|
|               Other|
|        Outside Fire|
|   Traffic Collision|
|       Assist Police|
|Gas Leak (Natural...|
|        Water Rescue|
+--------------------+
only showing top 20 rows



In [34]:
df_fire.select('CallType','City','UnitID').where(col('CallType').isNotNull()) \
        .distinct() \
        .sort('CallType', ascending=False) \
        .show(50,truncate=False)

+----------------------+-------------+------+
|CallType              |City         |UnitID|
+----------------------+-------------+------+
|Watercraft in Distress|SF           |E35   |
|Watercraft in Distress|SF           |RC1   |
|Watercraft in Distress|SF           |E16   |
|Watercraft in Distress|PR           |E34   |
|Watercraft in Distress|SF           |T08   |
|Watercraft in Distress|SF           |E02   |
|Watercraft in Distress|SF           |E13   |
|Watercraft in Distress|SF           |E28   |
|Watercraft in Distress|San Francisco|E35   |
|Watercraft in Distress|SF           |FB1   |
|Watercraft in Distress|SF           |RB1   |
|Watercraft in Distress|TI           |B03   |
|Watercraft in Distress|SAN FRANCISCO|B08   |
|Watercraft in Distress|FM           |94    |
|Watercraft in Distress|San Francisco|B10   |
|Watercraft in Distress|San Francisco|RB1   |
|Watercraft in Distress|San Francisco|RA48  |
|Watercraft in Distress|San Francisco|RS2   |
|Watercraft in Distress|SF        

#### Important Operations for Spark DF

In [35]:
df_fire2 = df_fire.withColumn('Delay in Seconds', col('Delay') * 60)

In [36]:
df_fire2.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: integer (nullable = true)
 |-- Box: integer (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)

In [37]:
df_fire2.withColumnRenamed('Delay', 'DelayInMins')

DataFrame[CallNumber: int, UnitID: string, IncidentNumber: int, CallType: string, CallDate: string, WatchDate: string, CallFinalDisposition: string, AvailableDtTm: string, Address: string, City: string, Zipcode: int, Battalion: string, StationArea: int, Box: int, OriginalPriority: string, Priority: string, FinalPriority: int, ALSUnit: boolean, CallTypeGroup: string, NumAlarms: int, UnitType: string, UnitSequenceInCallDispatch: int, FirePreventionDistrict: string, SupervisorDistrict: int, Neighborhood: string, Location: string, RowID: string, DelayInMins: double, Delay in Seconds: double]

In [38]:
df_fire_dt = df_fire.withColumn('IncidentDate',to_timestamp(col('CallDate'), 'MM/dd/yyyy')).drop('CallDate')

In [39]:
df_fire_dt.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: integer (nullable = true)
 |-- Box: integer (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: integer (nulla

In [40]:
df_fire_dt.select('CallType','CallNumber', 'IncidentDate') \
          .where(fn.year('IncidentDate')==2003) \
          .alias('IncidentYear').show()

+----------------+----------+-------------------+
|        CallType|CallNumber|       IncidentDate|
+----------------+----------+-------------------+
|Medical Incident|  30010041|2003-01-01 00:00:00|
|Medical Incident|  30010045|2003-01-01 00:00:00|
|Medical Incident|  30010068|2003-01-01 00:00:00|
|          Alarms|  30010080|2003-01-01 00:00:00|
|  Structure Fire|  30010086|2003-01-01 00:00:00|
|Medical Incident|  30010134|2003-01-01 00:00:00|
|          Alarms|  30010135|2003-01-01 00:00:00|
|    Vehicle Fire|  30010140|2003-01-01 00:00:00|
|Medical Incident|  30010176|2003-01-01 00:00:00|
|Medical Incident|  30010226|2003-01-01 00:00:00|
|           Other|  30010240|2003-01-01 00:00:00|
|Medical Incident|  30010310|2003-01-01 00:00:00|
|  Structure Fire|  30010316|2003-01-01 00:00:00|
|Medical Incident|  30010348|2003-01-01 00:00:00|
|Medical Incident|  30010360|2003-01-01 00:00:00|
|Medical Incident|  30010361|2003-01-01 00:00:00|
|Medical Incident|  30010377|2003-01-01 00:00:00|


In [41]:
df_fire_dt.select('CallType','CallNumber',fn.year('IncidentDate').alias('IncidentYear')) \
            .where('IncidentYear=2000') \
            .show()

+--------------------+----------+------------+
|            CallType|CallNumber|IncidentYear|
+--------------------+----------+------------+
|    Medical Incident|   1040031|        2000|
|Citizen Assist / ...|   1040086|        2000|
|    Medical Incident|   1040236|        2000|
|        Outside Fire|   1040263|        2000|
|    Medical Incident|   1050006|        2000|
|    Medical Incident|   1050046|        2000|
|    Medical Incident|   1050051|        2000|
|    Medical Incident|   1050103|        2000|
|    Medical Incident|   1050154|        2000|
|    Medical Incident|   1050186|        2000|
|    Medical Incident|   1050312|        2000|
|    Medical Incident|   1050364|        2000|
|    Medical Incident|   1050374|        2000|
|    Medical Incident|   1060076|        2000|
|              Alarms|   1060094|        2000|
|    Medical Incident|   1060128|        2000|
|      Structure Fire|   1060140|        2000|
|               Other|   1060165|        2000|
|      Struct

df_fire_dt.select('CallType') \
          .where(col('CallType').isNotNull()) \
          .groupBy('CallType') \
          .count().show(truncate=False)

In [88]:
### to get the number of Partitions 

num_partitions = df_fire.rdd.getNumPartitions()

# Display the number of partitions
print(f"Number of partitions in the DataFrame: {num_partitions}")

Number of partitions in the DataFrame: 8


#### How to choose the number of partitions in your Spark Cluster? 


Choosing the optimal number of partitions in Spark is crucial for achieving a balance between parallelism and resource efficiency. Here's a guide to help you determine the best number of partitions:

## 1. Default Behavior

- **HDFS/S3 Files**: Spark usually defaults to one partition per HDFS/S3 block (typically 128 MB or 256 MB).
- **DataFrames/RDDs**: The default number of partitions is based on the cluster's configuration, particularly `spark.default.parallelism`, which is typically set to the number of cores in the cluster.

## 2. General Guidelines

- **Rule of Thumb**: A common heuristic is to aim for 2-4 partitions per CPU core in your cluster. For example, if your cluster has 100 cores, you might start with 200-400 partitions.
- **Data Size**: A typical partition size is 128 MB to 256 MB. If your dataset is 1 TB, a good starting point might be around 4000 to 8000 partitions.

## 3. Workload Characteristics

- **I/O Intensive Workloads**: If the workload is I/O-bound (e.g., reading/writing data), more partitions can help improve parallelism and reduce job runtime.
- **CPU Intensive Workloads**: If the workload is CPU-bound (e.g., heavy computations), fewer partitions can help reduce overhead from task scheduling and increase the effectiveness of each task.

## 4. Repartitioning

- **Repartition**: If your partitions are too large, you can repartition the data to create more partitions, which is particularly useful before a shuffle operation.

    ```python
    df_repartitioned = df.repartition(num_partitions)
    ```

- **Coalesce**: If your partitions are too small, you can coalesce them to reduce the number of partitions, which is useful after a shuffle operation.

    ```python
    df_coalesced = df.coalesce(num_partitions)
    ```

## 5. Experiment and Monitor

- **Experimentation**: Start with a reasonable number of partitions based on the guidelines and adjust based on the workload's performance.
- **Monitoring**: Use Spark’s web UI to monitor the stages and tasks. Look for skewed partitions (some tasks taking significantly longer than others) and adjust accordingly.
- **Shuffle Partitions**: For operations that involve a shuffle, like `groupBy`, `join`, or `reduceByKey`, you can control the number of shuffle partitions using:

    ```python
    spark.conf.set("spark.sql.shuffle.partitions", num_partitions)
    ```

## 6. Cluster Resources

Consider the available memory and CPU resources in your cluster. Too many partitions might lead to excessive task scheduling overhead, while too few might lead to inefficient use of resources and longer job execution times.


In [51]:
df_fire_dt.select('CallType') \
          .where(col('CallType').isNotNull()) \
          .groupBy('CallType') \
          .agg(fn.count(col('CallType')).alias('NumberofCalls')) \
          .orderBy('NumberofCalls', ascending=False) \
          .show(truncate=False)

+-------------------------------+-------------+
|CallType                       |NumberofCalls|
+-------------------------------+-------------+
|Medical Incident               |113794       |
|Structure Fire                 |23319        |
|Alarms                         |19406        |
|Traffic Collision              |7013         |
|Citizen Assist / Service Call  |2524         |
|Other                          |2166         |
|Outside Fire                   |2094         |
|Vehicle Fire                   |854          |
|Gas Leak (Natural and LP Gases)|764          |
|Water Rescue                   |755          |
|Odor (Strange / Unknown)       |490          |
|Electrical Hazard              |482          |
|Elevator / Escalator Rescue    |453          |
|Smoke Investigation (Outside)  |391          |
|Fuel Spill                     |193          |
|HazMat                         |124          |
|Industrial Accidents           |94           |
|Explosion                      |89     

### Spark SQL

In [56]:
df = spark.read.csv('departuredelays.csv',header=True,inferSchema=True)

In [55]:
df.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

##### How to create a spark sql statement? 

In [59]:
df.createOrReplaceTempView('UsDelays')

In [68]:
query1= spark.sql("select * from UsDelays  where distance > 300")

In [69]:
query1.show()

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
|1030605|    0|     602|   ABE|        ATL|
|1041243|   10|     602|   ABE|        ATL|
|1040605|   28|     602|   ABE|        ATL|
|1051245|   88|     602|   ABE|        ATL|
|1050605|    9|     602|   ABE|        ATL|
|1061215|   -6|     602|   ABE|        ATL|
|1061725|   69|     602|   ABE|        ATL|
|1061230|    0|     369|   ABE|        DTW|
|1060625|   -3|     602|   ABE|        ATL|
|1070600|    0|     369|   ABE|        DTW|
|1071725|    0|     602|   ABE|        ATL|
|1071230|    0|     369|   ABE|        DTW|
|1070625|    0|     602|   ABE|        ATL|
|1071219|    0|     569|   ABE|        ORD|
|1080600|    0|     369|   ABE| 

In [73]:
# same as query above but using Spark Dataframes
df.select('*').where('distance > 300').show()

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
|1030605|    0|     602|   ABE|        ATL|
|1041243|   10|     602|   ABE|        ATL|
|1040605|   28|     602|   ABE|        ATL|
|1051245|   88|     602|   ABE|        ATL|
|1050605|    9|     602|   ABE|        ATL|
|1061215|   -6|     602|   ABE|        ATL|
|1061725|   69|     602|   ABE|        ATL|
|1061230|    0|     369|   ABE|        DTW|
|1060625|   -3|     602|   ABE|        ATL|
|1070600|    0|     369|   ABE|        DTW|
|1071725|    0|     602|   ABE|        ATL|
|1071230|    0|     369|   ABE|        DTW|
|1070625|    0|     602|   ABE|        ATL|
|1071219|    0|     569|   ABE|        ORD|
|1080600|    0|     369|   ABE| 

In [80]:
query2 = spark.sql(""" 
SELECT delay,origin, destination, 
CASE WHEN delay > 360 THEN 'Very Long Delays'
WHEN delay > 120 AND delay < 360 THEN 'Long Delays'
WHEN delay > 60 AND delay < 120 THEN 'Short Delays'
WHEN delay > 0 AND delay < 60 THEN 'Tolerable Delays'
WHEN delay = 0 THEN 'No Delays'
ELSE 'Early'
END AS Flights_Delays
FROM UsDelays
ORDER BY 2,1 Desc""")

In [81]:
query2.show()

+-----+------+-----------+--------------+
|delay|origin|destination|Flights_Delays|
+-----+------+-----------+--------------+
|  333|   ABE|        ATL|   Long Delays|
|  305|   ABE|        ATL|   Long Delays|
|  275|   ABE|        ATL|   Long Delays|
|  257|   ABE|        ATL|   Long Delays|
|  247|   ABE|        ATL|   Long Delays|
|  247|   ABE|        DTW|   Long Delays|
|  219|   ABE|        ORD|   Long Delays|
|  211|   ABE|        ATL|   Long Delays|
|  197|   ABE|        DTW|   Long Delays|
|  192|   ABE|        ORD|   Long Delays|
|  180|   ABE|        ATL|   Long Delays|
|  173|   ABE|        DTW|   Long Delays|
|  165|   ABE|        ATL|   Long Delays|
|  159|   ABE|        ORD|   Long Delays|
|  159|   ABE|        ATL|   Long Delays|
|  158|   ABE|        ATL|   Long Delays|
|  151|   ABE|        DTW|   Long Delays|
|  127|   ABE|        ATL|   Long Delays|
|  121|   ABE|        DTW|   Long Delays|
|  118|   ABE|        DTW|  Short Delays|
+-----+------+-----------+--------

#### Data Sources Connectors

#### To connect any database with Spark you can use spark.read.jdbc or spark.read.odbc

In [None]:
# Configuration for PostgreSQL connection
jdbc_url = "jdbc:postgresql://your_host:your_port/your_database"
connection_properties = {
    "user": "your_username",
    "password": "your_password",
    "driver": "org.postgresql.Driver"
}

# Reading data from PostgreSQL table
df = spark.read.jdbc(url=jdbc_url, table="your_table", properties=connection_properties)

# Show the data
df.show()


In [None]:
# ODBC Configuration for PostgreSQL connection
odbc_url = "jdbc:odbc://your_host:your_port/your_database"
odbc_properties = {
    "driver": "org.postgresql.Driver",
    "user": "your_username",
    "password": "your_password"
}

# Reading data from PostgreSQL using ODBC
df = spark.read.format("jdbc").option("url", odbc_url) \
    .option("dbtable", "your_table") \
    .option("user", odbc_properties["user"]) \
    .option("password", odbc_properties["password"]) \
    .option("driver", odbc_properties["driver"]) \
    .load()

# Show the data
df.show()


#### Reading Distributed Formats

In [83]:
df_parquet = spark.read.parquet('userdata.parquet', header=True)

In [86]:
df_parquet.show(1)

+-------------------+---+----------+---------+----------------+------+-----------+----------------+---------+---------+--------+----------------+--------+
|  registration_dttm| id|first_name|last_name|           email|gender| ip_address|              cc|  country|birthdate|  salary|           title|comments|
+-------------------+---+----------+---------+----------------+------+-----------+----------------+---------+---------+--------+----------------+--------+
|2016-02-03 07:55:29|  1|    Amanda|   Jordan|ajordan0@com.com|Female|1.197.201.2|6759521864920116|Indonesia| 3/8/1971|49756.53|Internal Auditor|   1E+02|
+-------------------+---+----------+---------+----------------+------+-----------+----------------+---------+---------+--------+----------------+--------+
only showing top 1 row



#### writing it to another format

In [87]:
df_parquet.write.csv('from_parquet_to_csv.csv')

#### Genral use to read any format is to use : spark.read.format and options

In [90]:
spark.read.format('json') \
    .option('inferSchema','true') \
    .option('header','true') \
    .load('people.json').show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



#### Another example using format for read with database connection

In [91]:
def create_spark_session():
    """
    Create and configure a Spark session.
    """
    spark = SparkSession.builder.config("spark.jars", "/Drivers/SQL_Sever/jdbc/postgresql-42.7.3.jar").getOrCreate()
    return spark

def load_data_from_postgres(spark, tbl_list):
    """
    Load data from PostgreSQL into Spark dataframes.
    
    Args:
    - spark: The Spark session object.
    - tbl_list: A list of table names to be loaded from PostgreSQL.
    
    Returns:
    A dictionary of Spark dataframes.
    """
    dataframe = {}
    for table in tbl_list:
        df = spark.read.format("jdbc") \
            .option("url", "jdbc:postgresql://postgres:5432/arsenalfc") \
            .option("driver", "org.postgresql.Driver") \
            .option("dbtable", f"{table}") \
            .option("user", "postgres") \
            .option("password", "postgres") \
            .load()
        dataframe[table] = df
    return dataframe

#### Another example using format for writing on database 

In [92]:
def create_and_load_dim_date(spark):
    """
    Create the dim_date DataFrame and load it into a PostgreSQL table.
    """
    # Example logic to calculate min_date and date_diff
    # You'll need to replace this with actual logic to determine these values
    min_date = '2017-08-11'
    max_date = '2023-02-25'
    date_diff = (to_date(lit(max_date), 'yyyy-MM-dd') - to_date(lit(min_date), 'yyyy-MM-dd')).days

    # Now create the date_df DataFrame
    date_df = spark.range(date_diff + 1).select(expr(f"date_add(to_date('{min_date}', 'yyyy-MM-dd'), id)").alias("Date"))
    
    # Create the dim_date DataFrame with additional date parts
    dim_date_df = date_df.select(
        "Date",
        year("Date").alias("Year"),
        month("Date").alias("Month"),
        dayofmonth("Date").alias("Day"),
        dayofweek("Date").alias("Weekday"),
        quarter("Date").alias("Quarter")
    )
    
    # Load the dim_date DataFrame to the PostgreSQL database
    dim_date_df.write.format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/arsenalfc") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", "dwh.DimDate") \
        .option("user", "postgres") \
        .option("password", "postgres") \
        .mode("overwrite") \
        .save()


#### Thank you!