In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,expr,to_timestamp,when, desc, asc, concat, lit, count, avg, udf
import pyspark.sql.functions as F

### Set up Spark UI ###
### Settings conf can be set here or dynamically at runtime (spark-submit)
spark = SparkSession.builder.master("local").appName("App").\
config("spark.sql.catalogImplementation", "hive").\
config("spark.driver.bindAddress","localhost").\
config("spark.ui.port","4050").\
enableHiveSupport().\
getOrCreate()

### Set up wo Spark UI ###
# spark=SparkSession.builder.appName('App').getOrCreate()

In [3]:
spark

In [3]:
### Read file with schema infer ### - 2 options
# df_pyspark=spark.read.option('header','true').csv('flight_delays/data_flights.csv',inferSchema=True)
# df_pyspark=spark.read.csv('flight_delays/data_flights.csv',header=True,inferSchema=True)

### Read file with schema predefined - recomm ###
schema = "`date` INT, `delay` INT, `distance` INT, `origin` STRING, `destination` STRING"
df_pyspark=spark.read.csv('flight_delays/data_flights.csv',header=True,schema=schema, sep=',')

print(df_pyspark)
print(type(df_pyspark))

### Print basic statistics (min, max, count, mean, stddev)
df_pyspark.describe(["delay"]).show()

DataFrame[date: int, delay: int, distance: int, origin: string, destination: string]
<class 'pyspark.sql.dataframe.DataFrame'>


                                                                                

+-------+------------------+
|summary|             delay|
+-------+------------------+
|  count|           1391578|
|   mean|12.079802928761449|
| stddev| 38.80773374985683|
|    min|              -112|
|    max|              1642|
+-------+------------------+



In [16]:
### They are all actions()
### Show top 3 rows as Row class - the same goes with TAKE, FIRST, can be used to create new DF
print(df_pyspark.head(3))

### Show top 3 rows as table - no return
print(df_pyspark.show(3))

[Row(date=1011245, delay=6, distance=602, origin='ABE', destination='ATL'), Row(date=1020600, delay=-8, distance=369, origin='ABE', destination='DTW'), Row(date=1021245, delay=-2, distance=602, origin='ABE', destination='ATL')]
+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+
only showing top 3 rows

None


In [17]:
### Print column names as list
print(df_pyspark.columns)

### Print readable schema
print(df_pyspark.printSchema())

### Print cols and datatypes
print(df_pyspark.dtypes)

### Print schema that can be reused
df_pyspark.schema

['date', 'delay', 'distance', 'origin', 'destination']
root
 |-- date: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)

None
[('date', 'int'), ('delay', 'int'), ('distance', 'int'), ('origin', 'string'), ('destination', 'string')]


StructType([StructField('date', IntegerType(), True), StructField('delay', IntegerType(), True), StructField('distance', IntegerType(), True), StructField('origin', StringType(), True), StructField('destination', StringType(), True)])

In [38]:
### Select specific cols, compare diff options to work with cols
df_pyspark.select("delay").show(2)
df_pyspark.select(df_pyspark.delay, (df_pyspark["delay"] + 10)).show(2)
df_pyspark.select(col("delay"), (col("delay") + 10)).show(2)
df_pyspark.select(expr("delay"), expr("delay+10").alias("new_del")).show(2, vertical=True)
df_pyspark.selectExpr("delay", "delay+10 AS new_del").show(2)

+-----+------------+
|delay|(delay + 10)|
+-----+------------+
|    6|          16|
|   -8|           2|
+-----+------------+
only showing top 2 rows

+-----+------------+
|delay|(delay + 10)|
+-----+------------+
|    6|          16|
|   -8|           2|
+-----+------------+
only showing top 2 rows

-RECORD 0------
 delay   | 6   
 new_del | 16  
-RECORD 1------
 delay   | -8  
 new_del | 2   
only showing top 2 rows

+-----+-------+
|delay|new_del|
+-----+-------+
|    6|     16|
|   -8|      2|
+-----+-------+
only showing top 2 rows

+-----+
|delay|
+-----+
|    6|
|   -8|
+-----+
only showing top 2 rows



In [7]:
### Add or replace column
new_df = df_pyspark.withColumn("route", concat(col("origin"), lit("-"), col("destination")))
print(new_df.show(1))

### Rename column
print(new_df.withColumnRenamed("date","new_date").columns)

### Change type column
print(new_df.withColumn("delay", new_df.delay.cast("Float").alias("new_del")).dtypes)

### Delete column
new_df.drop("date").columns

### When statement
df_pyspark.withColumn("Flight_Delays",when(col("delay") > 360, 'Very Long Delays').when((col("delay") > 120) & (col("delay") < 360), 'Long Delays').otherwise('Short Delays')).orderBy(desc("origin"), asc("delay")).show(2)

+-------+-----+--------+------+-----------+-------+
|   date|delay|distance|origin|destination|  route|
+-------+-----+--------+------+-----------+-------+
|1011245|    6|     602|   ABE|        ATL|ABE-ATL|
+-------+-----+--------+------+-----------+-------+
only showing top 1 row

None
['new_date', 'delay', 'distance', 'origin', 'destination', 'route']
[('date', 'int'), ('delay', 'float'), ('distance', 'int'), ('origin', 'string'), ('destination', 'string'), ('route', 'string')]


[Stage 8:>                                                          (0 + 1) / 1]

+-------+-----+--------+------+-----------+-------------+
|   date|delay|distance|origin|destination|Flight_Delays|
+-------+-----+--------+------+-----------+-------------+
|3141155|  -19|     139|   YUM|        PHX| Short Delays|
|2161859|  -17|     139|   YUM|        PHX| Short Delays|
+-------+-----+--------+------+-----------+-------------+
only showing top 2 rows



                                                                                

In [None]:
### Cast columns as timestamp
new_df.withColumn("parsed_date", to_timestamp(col("date"), "MMddyyyy")).drop("date")
new_df.select(year("parsed_date"), month("parsed_date"), day("parsed_date"))

In [4]:
### Get only 10 rows
df_small = df_pyspark.limit(10)

### Create in-memory DF and merge two DFs
new_row = spark.createDataFrame([(1011244,5,7, None, None), (None,None,None, None, None)], schema=df_small.schema)
df_null = df_small.union(new_row)

### Count rows
df_null.count()

                                                                                

12

In [39]:
### Filtering rows
df_small.filter("delay>=6").select('delay', 'date').show(2)

### two AND conditions
df_small.filter((df_small["delay"]>=6) & (df_small["date"]<1051245)).show(2)

### two OR and NOT operator
df_small.filter(~(df_small.delay>=6) | (df_small.date<1051245)).show(2)

### Alias for filter, show NOT NULL
df_small.where((col("delay") >= 6) & (col("delay").isNotNull())).show(2)

+-----+-------+
|delay|   date|
+-----+-------+
|    6|1011245|
|   10|1041243|
+-----+-------+
only showing top 2 rows

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1041243|   10|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+
only showing top 2 rows

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
+-------+-----+--------+------+-----------+
only showing top 2 rows

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1041243|   10|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+
only showing top 2 rows



In [50]:
### Aggr, count rows and order, can be also mean, max, sum....
df_null.select("*").where(col("origin").isNotNull()).groupBy("origin", "destination").count().orderBy("count", ascending=False).show()

### More than one aggr
df_null.groupBy("origin", "destination").agg(count("*").alias("count"), avg("delay")).orderBy("count", ascending=False).show()

### Using trick with F import
df_null.groupBy("origin", "destination").agg(F.count("*").alias("count"), F.avg("delay")).orderBy("count", ascending=False).show()

+------+-----------+-----+
|origin|destination|count|
+------+-----------+-----+
|   ABE|        ATL|    9|
|   ABE|        DTW|    1|
+------+-----------+-----+

+------+-----------+-----+------------------+
|origin|destination|count|        avg(delay)|
+------+-----------+-----+------------------+
|   ABE|        ATL|    9|14.555555555555555|
|  NULL|       NULL|    2|               5.0|
|   ABE|        DTW|    1|              -8.0|
+------+-----------+-----+------------------+

+------+-----------+-----+------------------+
|origin|destination|count|        avg(delay)|
+------+-----------+-----+------------------+
|   ABE|        ATL|    9|14.555555555555555|
|  NULL|       NULL|    2|               5.0|
|   ABE|        DTW|    1|              -8.0|
+------+-----------+-----+------------------+



In [19]:
### Dealing with null values
### Drop all rows where even one null
df_null = df_null.withColumn("id", F.monotonically_increasing_id())
df_null.show()
print(df_null.na.drop().count())  # def how=any / =all where all columns null

### Drop rows where at least 4 not None values
print(df_null.na.drop(thresh=4).count())

### Drop rows where None in specific column
print(df_null.na.drop(subset=["date"]).count())

### Fill null values where string=None
df_null.na.fill("-")

### Fill null values in specific cols
df_null.na.fill(value=0, subset=["delay"]).tail(2)

+-------+-----+--------+------+-----------+----------+
|   date|delay|distance|origin|destination|        id|
+-------+-----+--------+------+-----------+----------+
|1011245|    6|     602|   ABE|        ATL|         0|
|1020600|   -8|     369|   ABE|        DTW|         1|
|1021245|   -2|     602|   ABE|        ATL|         2|
|1020605|   -4|     602|   ABE|        ATL|         3|
|1031245|   -4|     602|   ABE|        ATL|         4|
|1030605|    0|     602|   ABE|        ATL|         5|
|1041243|   10|     602|   ABE|        ATL|         6|
|1040605|   28|     602|   ABE|        ATL|         7|
|1051245|   88|     602|   ABE|        ATL|         8|
|1050605|    9|     602|   ABE|        ATL|         9|
|1011244|    5|       7|  NULL|       NULL|8589934592|
|   NULL| NULL|    NULL|  NULL|       NULL|8589934593|
+-------+-----+--------+------+-----------+----------+

10
11
11


In [10]:
### SQL
### TEMPORARY views are session-scoped
df_pyspark.createOrReplaceTempView("temp_view")  # FROM temp_view
df_pyspark.createOrReplaceGlobalTempView("global_temp_view")  # FROM global_temp.global_temp_view

spark.sql("""SELECT delay, origin, destination,
 CASE
 WHEN delay > 360 THEN 'Very Long Delays'
 WHEN delay > 120 AND delay < 360 THEN 'Long Delays'
 WHEN delay < 120 THEN 'Short Delays'
 ELSE 'Early'
 END AS Flight_Delays
 FROM temp_view
 ORDER BY origin, delay DESC""").show(2)

                                                                                

+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|  333|   ABE|        ATL|  Long Delays|
|  305|   ABE|        ATL|  Long Delays|
+-----+------+-----------+-------------+
only showing top 2 rows

+-----------+----------------+-----------+
|  namespace|        viewName|isTemporary|
+-----------+----------------+-----------+
|global_temp|global_temp_view|       true|
|           |       temp_view|       true|
+-----------+----------------+-----------+

+--------------+
|     namespace|
+--------------+
|       default|
|learn_spark_db|
+--------------+



In [11]:
spark.catalog.listTables()
spark.sql("SHOW VIEWS IN GLOBAL_TEMP").show()
spark.sql("SHOW DATABASES").show()
spark.catalog.listColumns("temp_view")

+-----------+----------------+-----------+
|  namespace|        viewName|isTemporary|
+-----------+----------------+-----------+
|global_temp|global_temp_view|       true|
|           |       temp_view|       true|
+-----------+----------------+-----------+

+--------------+
|     namespace|
+--------------+
|       default|
|learn_spark_db|
+--------------+



[Column(name='date', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='delay', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='distance', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='origin', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='destination', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]

In [17]:
### Create and use DB
# spark.sql("CREATE DATABASE learn_spark_db")
spark.sql("USE learn_spark_db")

### Managed table
# spark.sql("CREATE TABLE managed_us_delay (date STRING, delay INT, distance INT, origin STRING, destination STRING)")

### Same AS
# df_small.write.saveAsTable("managed_us_delay1")
spark.sql("SELECT * FROM managed_us_delay1").show(2)

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
+-------+-----+--------+------+-----------+
only showing top 2 rows



In [None]:
### Un-Managed
spark.sql("""CREATE TABLE us_delay_flights_tbl(date STRING, delay INT, distance INT, origin STRING, destination STRING) USING csv OPTIONS (PATH '/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')""")

# SAME AS
# (flights_df
#  .write
#  .option("path", "/tmp/data/us_flights_delay")
#  .saveAsTable("us_delay_flights_tbl"))

In [29]:
### PySpark UDFs
from pyspark.sql.types import LongType

@F.udf(returnType=LongType())
def cubed(s):
    if s is not None:
        return s * s * s        
# Generate temporary view
spark.range(1, 9).createOrReplaceTempView("udf_test")

df_pyspark.select(cubed(col("delay").cast("int"))).show(2)

# To use in SQL it has to be registered
spark.udf.register("cubed", cubed)
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show(2)


+-------------------------+
|cubed(cast(delay as int))|
+-------------------------+
|                      216|
|                     -512|
+-------------------------+
only showing top 2 rows



24/04/08 16:16:26 WARN SimpleFunctionRegistry: The function cubed replaced a previously registered function.


+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
+---+--------+
only showing top 2 rows



In [31]:
### Pandas UDFs (vectorized user defined function)
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

@pandas_udf(LongType())
def cubed2(a: pd.Series) -> pd.Series:
   return a * a * a

df_pyspark.select(cubed2(col("delay").cast("int"))).show(2)
# cubed_udf = pandas_udf(cubed, returnType=LongType())

[Stage 31:>                                                         (0 + 1) / 1]

+--------------------------+
|cubed2(cast(delay as int))|
+--------------------------+
|                       216|
|                      -512|
+--------------------------+
only showing top 2 rows



                                                                                

In [13]:
from pyspark.sql.functions import array_contains, array_sort, array_union, array_intersect, slice,explode, size, map_keys
data = [("Alice", [2, 4, 6]), 
        ("Bob", [1, 2, 3]),
        ("Charlie", [4, 5, 6])]

df = spark.createDataFrame(data, ["Name", "Numbers"])

df.filter(array_contains(df.Numbers, 4)).select("Name", array_sort(df.Numbers).alias("Sorted_Numbers"), slice(df.Numbers, 2, 3),
                                               size(df.Numbers)).show()
df.select("Name", explode(df.Numbers).alias("Number")).show()

+-------+--------------+--------------------+-------------+
|   Name|Sorted_Numbers|slice(Numbers, 2, 3)|size(Numbers)|
+-------+--------------+--------------------+-------------+
|  Alice|     [2, 4, 6]|              [4, 6]|            3|
|Charlie|     [4, 5, 6]|              [5, 6]|            3|
+-------+--------------+--------------------+-------------+

+-------+------+
|   Name|Number|
+-------+------+
|  Alice|     2|
|  Alice|     4|
|  Alice|     6|
|    Bob|     1|
|    Bob|     2|
|    Bob|     3|
|Charlie|     4|
|Charlie|     5|
|Charlie|     6|
+-------+------+



In [14]:
from pyspark.sql.types import StructField, StructType, StringType, MapType
schema = StructType([
    StructField('name', StringType(), True),
    StructField('properties', MapType(StringType(),StringType()),True)
])
dataDictionary = [
        ('James',{'hair':'black','eye':'brown'}),
        ('Michael',{'hair':'brown','eye':None}),
        ('Robert',{'hair':'red','eye':'black'}),
        ('Washington',{'hair':'grey','eye':'grey'}),
        ('Jefferson',{'hair':'brown','eye':''})
        ]
df = spark.createDataFrame(data=dataDictionary, schema = schema)
df.show(2, truncate=False)
df.withColumn("hair",df.properties.getItem("hair")) \
  .withColumn("eye",df.properties.getItem("eye")) \
  .drop("properties") \
  .show()
df.select(df.name,explode(df.properties)).show(2)

df.select(df.name,map_keys(df.properties)).show(2) # map_values

+-------+-----------------------------+
|name   |properties                   |
+-------+-----------------------------+
|James  |{eye -> brown, hair -> black}|
|Michael|{eye -> NULL, hair -> brown} |
+-------+-----------------------------+
only showing top 2 rows

+----------+-----+-----+
|      name| hair|  eye|
+----------+-----+-----+
|     James|black|brown|
|   Michael|brown| NULL|
|    Robert|  red|black|
|Washington| grey| grey|
| Jefferson|brown|     |
+----------+-----+-----+

+-----+----+-----+
| name| key|value|
+-----+----+-----+
|James| eye|brown|
|James|hair|black|
+-----+----+-----+
only showing top 2 rows

+-------+--------------------+
|   name|map_keys(properties)|
+-------+--------------------+
|  James|         [eye, hair]|
|Michael|         [eye, hair]|
+-------+--------------------+
only showing top 2 rows



In [20]:
### Cache
# df2.count() is the first action hence it triggers the execution of reading a CSV file, and df.where().
# df3.count(), this again triggers execution of reading a file - we are reading twice
df=spark.read.csv('flight_delays/data_flights.csv',header=True,inferSchema=True)
  
df2 = df.where(col("origin") =="ABE")
count = df2.count()
print(count)

df3 = df.where(col("destination") == "DTW")
count = df3.count()
print(count)

[Stage 58:>                                                         (0 + 1) / 1]

448
23310


                                                                                