In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,StructField,IntegerType

In [2]:
spark = (SparkSession.builder.appName('Sparkdfpractice').getOrCreate())

23/09/07 13:55:34 WARN Utils: Your hostname, Obinnas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.2.190 instead (on interface en0)
23/09/07 13:55:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/07 13:55:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/09/07 13:55:36 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
#Start by creating the schema of the dataframe
#Data as list of tuples
data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

In [4]:
type(data)

list

In [5]:
#We have defined the schema variable and stated that all fields can be nullable
schema = StructType([
    StructField("Firstname", StringType(), True),
    StructField("middlename", StringType(), True),
    StructField("Lastname", StringType(), True),
    StructField("ID", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Salary", IntegerType(), True)
])

In [6]:
#create a dataframe and assign the schema

df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()

root
 |-- Firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- Lastname: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [7]:
df.show(truncate=False)

                                                                                

+---------+----------+--------+-----+------+------+
|Firstname|middlename|Lastname|ID   |Gender|Salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



In [8]:
#fire_df.printSchema()

## WORKING WITH STRUCTURED OPERATIONS 

### Reading in the json file

In [9]:
from pyspark.sql.types import ArrayType, FloatType, DateType, BooleanType

In [10]:
persons_schema = StructType ([
    StructField('id',IntegerType(), True),
    StructField('first_name',StringType(), True),
    StructField('last_name',StringType(), True),
    StructField('fav_movies',ArrayType(StringType()), True),
    StructField('salary',FloatType(), True),
    StructField('image_url',StringType(), True),
    StructField('date_of_birth',DateType(), True),
    StructField('active',BooleanType(), True)
]) 

In [11]:
filepath = "./testdata/persons/persons.json"

persons_df = (spark.read.json(filepath,persons_schema,multiLine=True))

In [12]:
#spark.stop()

In [13]:
persons_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- fav_movies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- salary: float (nullable = true)
 |-- image_url: string (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- active: boolean (nullable = true)



In [14]:
persons_df.show(truncate=False)

+---+----------+-----------+----------------------------------------------------------------------------+-------+-----------------------------------------------+-------------+------+
|id |first_name|last_name  |fav_movies                                                                  |salary |image_url                                      |date_of_birth|active|
+---+----------+-----------+----------------------------------------------------------------------------+-------+-----------------------------------------------+-------------+------+
|1  |Drucy     |Poppy      |[I giorni contati]                                                          |1463.36|http://dummyimage.com/126x166.png/cc0000/ffffff|1991-02-16   |true  |
|2  |Emelyne   |Blaza      |[Musketeer, The, Topralli]                                                  |3006.04|http://dummyimage.com/158x106.bmp/cc0000/ffffff|1991-11-02   |false |
|3  |Max       |Rettie     |[The Forgotten Space, Make It Happen]                    

In [15]:
persons_df.columns

['id',
 'first_name',
 'last_name',
 'fav_movies',
 'salary',
 'image_url',
 'date_of_birth',
 'active']

## Columns and Expressions

In [16]:
from pyspark.sql.functions import col, expr

In [17]:
persons_df.select(col("first_name"),col('last_name'),col('date_of_birth')).show(5)

+----------+---------+-------------+
|first_name|last_name|date_of_birth|
+----------+---------+-------------+
|     Drucy|    Poppy|   1991-02-16|
|   Emelyne|    Blaza|   1991-11-02|
|       Max|   Rettie|   1990-03-03|
|    Ilario|     Kean|   1987-06-09|
|     Toddy|   Drexel|   1992-10-28|
+----------+---------+-------------+
only showing top 5 rows



In [18]:
persons_df.select("first_name",'last_name','date_of_birth').show(5)

+----------+---------+-------------+
|first_name|last_name|date_of_birth|
+----------+---------+-------------+
|     Drucy|    Poppy|   1991-02-16|
|   Emelyne|    Blaza|   1991-11-02|
|       Max|   Rettie|   1990-03-03|
|    Ilario|     Kean|   1987-06-09|
|     Toddy|   Drexel|   1992-10-28|
+----------+---------+-------------+
only showing top 5 rows



In [19]:
persons_df.select(expr("first_name"),expr('last_name'),expr('date_of_birth')).show(5)

+----------+---------+-------------+
|first_name|last_name|date_of_birth|
+----------+---------+-------------+
|     Drucy|    Poppy|   1991-02-16|
|   Emelyne|    Blaza|   1991-11-02|
|       Max|   Rettie|   1990-03-03|
|    Ilario|     Kean|   1987-06-09|
|     Toddy|   Drexel|   1992-10-28|
+----------+---------+-------------+
only showing top 5 rows



### concatenate the first and last name into one column and calcualte salary increase

In [20]:
from pyspark.sql.functions import concat,concat_ws

In [21]:
persons_df.select(concat_ws(' ', col("first_name"), col("last_name")).alias("full_name"),
                  col("salary"), 
                (col("salary")*1.1).alias("salary_increase")).show(10)

+----------------+-------+------------------+
|       full_name| salary|   salary_increase|
+----------------+-------+------------------+
|     Drucy Poppy|1463.36| 1609.695983886719|
|   Emelyne Blaza|3006.04|3306.6440429687505|
|      Max Rettie|1422.88|1565.1680053710938|
|     Ilario Kean|3561.36| 3917.496118164063|
|    Toddy Drexel|4934.87|  5428.35712890625|
| Oswald Petrolli|1153.23| 1268.552978515625|
|   Adrian Clarey|1044.73| 1149.202978515625|
|Dominica Goodnow|1147.76|1262.5360107421875|
|   Emory Slocomb|1082.11|1190.3209838867188|
|   Jeremias Bode|3472.63|3819.8928710937503|
+----------------+-------+------------------+
only showing top 10 rows



In [22]:
#USING EXPRESSION

persons_df.select(concat_ws(' ', col("first_name"), col("last_name")).alias("full_name"),
                  col("salary"), 
                  expr("salary *1.1").alias("salary_increase")).show(10)

+----------------+-------+------------------+
|       full_name| salary|   salary_increase|
+----------------+-------+------------------+
|     Drucy Poppy|1463.36| 1609.695983886719|
|   Emelyne Blaza|3006.04|3306.6440429687505|
|      Max Rettie|1422.88|1565.1680053710938|
|     Ilario Kean|3561.36| 3917.496118164063|
|    Toddy Drexel|4934.87|  5428.35712890625|
| Oswald Petrolli|1153.23| 1268.552978515625|
|   Adrian Clarey|1044.73| 1149.202978515625|
|Dominica Goodnow|1147.76|1262.5360107421875|
|   Emory Slocomb|1082.11|1190.3209838867188|
|   Jeremias Bode|3472.63|3819.8928710937503|
+----------------+-------+------------------+
only showing top 10 rows



## Filter and Where Conditions

In [23]:
persons_df.filter('salary <= 3000').show(10)

+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| id|first_name|  last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
|  1|     Drucy|      Poppy|  [I giorni contati]|1463.36|http://dummyimage...|   1991-02-16|  true|
|  3|       Max|     Rettie|[The Forgotten Sp...|1422.88|http://dummyimage...|   1990-03-03| false|
|  6|    Oswald|   Petrolli|[Wing and the Thi...|1153.23|http://dummyimage...|   1986-09-02| false|
|  7|    Adrian|     Clarey|[Walking Tall, Pa...|1044.73|http://dummyimage...|   1971-08-24| false|
|  8|  Dominica|    Goodnow|    [Hearts Divided]|1147.76|http://dummyimage...|   1973-08-27| false|
|  9|     Emory|    Slocomb|[Snake and Crane ...|1082.11|http://dummyimage...|   1974-06-08|  true|
| 11|   Timothy|     Ervine|[Land of the Lost...|1147.61|http://dummyimage...|   1971-06-02| false|


                                                                                

In [24]:
persons_df.where('salary <= 3000').show(10)

+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| id|first_name|  last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
|  1|     Drucy|      Poppy|  [I giorni contati]|1463.36|http://dummyimage...|   1991-02-16|  true|
|  3|       Max|     Rettie|[The Forgotten Sp...|1422.88|http://dummyimage...|   1990-03-03| false|
|  6|    Oswald|   Petrolli|[Wing and the Thi...|1153.23|http://dummyimage...|   1986-09-02| false|
|  7|    Adrian|     Clarey|[Walking Tall, Pa...|1044.73|http://dummyimage...|   1971-08-24| false|
|  8|  Dominica|    Goodnow|    [Hearts Divided]|1147.76|http://dummyimage...|   1973-08-27| false|
|  9|     Emory|    Slocomb|[Snake and Crane ...|1082.11|http://dummyimage...|   1974-06-08|  true|
| 11|   Timothy|     Ervine|[Land of the Lost...|1147.61|http://dummyimage...|   1971-06-02| false|


In [25]:
#two conditions using the where function
persons_df.where((col('salary')<=3000) & (col("active") == True)).show(10)

+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+
|  1|     Drucy|    Poppy|  [I giorni contati]|1463.36|http://dummyimage...|   1991-02-16|  true|
|  9|     Emory|  Slocomb|[Snake and Crane ...|1082.11|http://dummyimage...|   1974-06-08|  true|
| 16|   Margaux| Archbold|[And Now a Word f...|1013.75|http://dummyimage...|   1988-07-29|  true|
| 26|     Clive|      Lax|             [Rabid]|2126.87|http://dummyimage...|   1981-10-26|  true|
| 33|  Sherline|  Primett|   [Jungle Fighters]|2309.39|http://dummyimage...|   1972-07-23|  true|
| 34|     Davis|    Pinks|          [Hounddog]|1337.14|http://dummyimage...|   1989-07-27|  true|
| 37|    Carlen|  Sharply|[Dr. Jekyll and M...|2051.85|http://dummyimage...|   2002-06-01|  true|
| 40|    Jordan|   L

In [26]:
from pyspark.sql.functions import year

In [27]:
persons_df.filter((year("date_of_birth") == 2000) | (year("date_of_birth") == 1994) ).show(10)

+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| id|first_name|  last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| 15|    Feodor|Nancekivell|   [Monsoon Wedding]|2218.46|http://dummyimage...|   2000-10-07| false|
| 25|     Kelcy|     Wogdon|    [Iron Mask, The]|4512.51|http://dummyimage...|   2000-10-20|  true|
| 28|   Frankie|  Copestick|[Computer Wore Te...|1186.68|http://dummyimage...|   1994-03-09| false|
| 32|      Redd|   Akenhead|[Century of the D...| 2470.9|http://dummyimage...|   2000-06-05| false|
| 38|    Camile|       Mace|[Family Guy Prese...|3559.93|http://dummyimage...|   1994-12-12| false|
| 48|Maximilian|      Jonin|[The Last Shark, ...|3274.34|http://dummyimage...|   1994-09-16| false|
| 69|  Annabell|    Doughty|[Entertaining Ang...|2022.57|http://dummyimage...|   2000-09-03|  true|


In [28]:
from pyspark.sql.functions import array_contains

In [29]:
persons_df.where(array_contains(persons_df.fav_movies, "Land of the Lost")).show()

+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| 11|   Timothy|   Ervine|[Land of the Lost...|1147.61|http://dummyimage...|   1971-06-02| false|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+



## Distinct, Drop duplicates and orderby

In [30]:
from pyspark.sql.functions import count,desc

In [31]:
persons_df.select('active').distinct().show()

+------+
|active|
+------+
|  true|
| false|
+------+



In [32]:
(persons_df.select(col('first_name'), year(col('date_of_birth')).alias('year'), col('active')).orderBy('year','first_name')).show(10)

+----------+----+------+
|first_name|year|active|
+----------+----+------+
|    Adrian|1971| false|
|   Feodora|1971|  true|
|       Sky|1971| false|
|   Timothy|1971| false|
|    Lucita|1972|  true|
|      Rodi|1972| false|
|  Sherline|1972|  true|
|     Toddy|1972|  true|
|  Dominica|1973| false|
|    Kelila|1973|  true|
+----------+----+------+
only showing top 10 rows



In [33]:
#check if any records repeat themselves 

## Rows and unions 

In [34]:
from pyspark.sql import Row

In [35]:
new_person  = Row(101,"robert","jack",['Men in black','X_men'],4300.64,"http://someimage.com","1995-08-14",True)
new_peron_list = [Row(102,"daniel","jonne",['Men in black','X_men'],4300.64,"http://someimage.com","1995-08-14",True),
                  Row(103,"ryan","marris",['Men in black','X_men'],4300.64,"http://someimage.com","1995-08-14",True)]

In [36]:
new_peron_list.append(new_person)

In [37]:
print(new_peron_list)

[<Row(102, 'daniel', 'jonne', ['Men in black', 'X_men'], 4300.64, 'http://someimage.com', '1995-08-14', True)>, <Row(103, 'ryan', 'marris', ['Men in black', 'X_men'], 4300.64, 'http://someimage.com', '1995-08-14', True)>, <Row(101, 'robert', 'jack', ['Men in black', 'X_men'], 4300.64, 'http://someimage.com', '1995-08-14', True)>]


In [38]:
type(new_person)

pyspark.sql.types.Row

In [39]:
cols = persons_df.columns

In [40]:
new_person_df = spark.createDataFrame(new_peron_list,cols)

In [41]:
new_person_df.show()

                                                                                

+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+
|102|    daniel|    jonne|[Men in black, X_...|4300.64|http://someimage.com|   1995-08-14|  true|
|103|      ryan|   marris|[Men in black, X_...|4300.64|http://someimage.com|   1995-08-14|  true|
|101|    robert|     jack|[Men in black, X_...|4300.64|http://someimage.com|   1995-08-14|  true|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+



In [42]:
##We can combine new data to old using union

add_person_df = persons_df.union(new_person_df)

add_person_df.sort(desc('id')).show(10)

                                                                                

+---+----------+---------+--------------------+------------------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies|            salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+------------------+--------------------+-------------+------+
|103|      ryan|   marris|[Men in black, X_...|           4300.64|http://someimage.com|   1995-08-14|  true|
|102|    daniel|    jonne|[Men in black, X_...|           4300.64|http://someimage.com|   1995-08-14|  true|
|101|    robert|     jack|[Men in black, X_...|           4300.64|http://someimage.com|   1995-08-14|  true|
|100|    Virgie| Domanski|[Horseman, The, S...| 2165.929931640625|http://dummyimage...|   2002-01-05|  true|
| 99|   Rozalie|   Wannop|[Suddenly, The No...|1259.6400146484375|http://dummyimage...|   1997-03-25| false|
| 98|     Davin|     Labb|[Viva Riva!, Kill...| 1452.739990234375|http://dummyimage...|   1988-01-27|  true|
| 97|      Rodi|   

## Adding renaming and dropping columns 

In [43]:
from pyspark.sql.functions import round

In [44]:
aug_person_df1 = persons_df.withColumn("salary_increase", expr("salary*1.1"))
aug_person_df1.show(10)

+---+----------+---------+--------------------+-------+--------------------+-------------+------+------------------+
| id|first_name|last_name|          fav_movies| salary|           image_url|date_of_birth|active|   salary_increase|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+------------------+
|  1|     Drucy|    Poppy|  [I giorni contati]|1463.36|http://dummyimage...|   1991-02-16|  true| 1609.695983886719|
|  2|   Emelyne|    Blaza|[Musketeer, The, ...|3006.04|http://dummyimage...|   1991-11-02| false|3306.6440429687505|
|  3|       Max|   Rettie|[The Forgotten Sp...|1422.88|http://dummyimage...|   1990-03-03| false|1565.1680053710938|
|  4|    Ilario|     Kean|[Up Close and Per...|3561.36|http://dummyimage...|   1987-06-09|  true| 3917.496118164063|
|  5|     Toddy|   Drexel|[Walk in the Clou...|4934.87|http://dummyimage...|   1992-10-28|  true|  5428.35712890625|
|  6|    Oswald| Petrolli|[Wing and the Thi...|1153.23|http://du

In [45]:
aug_person_df1.columns

['id',
 'first_name',
 'last_name',
 'fav_movies',
 'salary',
 'image_url',
 'date_of_birth',
 'active',
 'salary_increase']

In [46]:
#extract the year from DOB and rename fav movies and dropped salary increase

aug_person_df2 = (aug_person_df1
                  .withColumn("birth_year",year("date_of_birth"))
                  .withColumnRenamed("fav_movies","movies")
                  .withColumn("salary_x10",round(col("salary_increase"),2))
                  .drop("salary_increase"))

In [47]:
aug_person_df2.show(10)

+---+----------+---------+--------------------+-------+--------------------+-------------+------+----------+----------+
| id|first_name|last_name|              movies| salary|           image_url|date_of_birth|active|birth_year|salary_x10|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+----------+----------+
|  1|     Drucy|    Poppy|  [I giorni contati]|1463.36|http://dummyimage...|   1991-02-16|  true|      1991|    1609.7|
|  2|   Emelyne|    Blaza|[Musketeer, The, ...|3006.04|http://dummyimage...|   1991-11-02| false|      1991|   3306.64|
|  3|       Max|   Rettie|[The Forgotten Sp...|1422.88|http://dummyimage...|   1990-03-03| false|      1990|   1565.17|
|  4|    Ilario|     Kean|[Up Close and Per...|3561.36|http://dummyimage...|   1987-06-09|  true|      1987|    3917.5|
|  5|     Toddy|   Drexel|[Walk in the Clou...|4934.87|http://dummyimage...|   1992-10-28|  true|      1992|   5428.36|
|  6|    Oswald| Petrolli|[Wing and the 

## Working with missing data 

In [48]:
bad_movies_list = [Row(None, None, None),
                   Row(None, None, 2020),
                   Row("John Doe", "Awesome Movie", None),
                   Row(None, "Awesome Movie", 2021),
                   Row("Mary Jane", None, 2019),
                   Row("Vikter Duplaix", "Not another teen movie", 2001)]

In [49]:
bad_movies_list

[<Row(None, None, None)>,
 <Row(None, None, 2020)>,
 <Row('John Doe', 'Awesome Movie', None)>,
 <Row(None, 'Awesome Movie', 2021)>,
 <Row('Mary Jane', None, 2019)>,
 <Row('Vikter Duplaix', 'Not another teen movie', 2001)>]

In [50]:
bad_movies_columns = ['actor_name','movie_title','produced_year']

In [51]:
bad_movies_df = spark.createDataFrame(bad_movies_list,bad_movies_columns)

In [52]:
bad_movies_df.show(truncate=False)

+--------------+----------------------+-------------+
|actor_name    |movie_title           |produced_year|
+--------------+----------------------+-------------+
|null          |null                  |null         |
|null          |null                  |2020         |
|John Doe      |Awesome Movie         |null         |
|null          |Awesome Movie         |2021         |
|Mary Jane     |null                  |2019         |
|Vikter Duplaix|Not another teen movie|2001         |
+--------------+----------------------+-------------+



In [53]:
#drop rows with any null values 
bad_movies_df.na.drop().show()

+--------------+--------------------+-------------+
|    actor_name|         movie_title|produced_year|
+--------------+--------------------+-------------+
|Vikter Duplaix|Not another teen ...|         2001|
+--------------+--------------------+-------------+



In [54]:
#drop rows with any null values 
bad_movies_df.na.drop("any").show()

+--------------+--------------------+-------------+
|    actor_name|         movie_title|produced_year|
+--------------+--------------------+-------------+
|Vikter Duplaix|Not another teen ...|         2001|
+--------------+--------------------+-------------+



In [55]:
#drop rows with all null values for all columns
bad_movies_df.na.drop("all").show()

+--------------+--------------------+-------------+
|    actor_name|         movie_title|produced_year|
+--------------+--------------------+-------------+
|          null|                null|         2020|
|      John Doe|       Awesome Movie|         null|
|          null|       Awesome Movie|         2021|
|     Mary Jane|                null|         2019|
|Vikter Duplaix|Not another teen ...|         2001|
+--------------+--------------------+-------------+



In [56]:
#filtered df for where actor name is not null
bad_movies_df.filter(col("actor_name")!= 'null').show()

                                                                                

+--------------+--------------------+-------------+
|    actor_name|         movie_title|produced_year|
+--------------+--------------------+-------------+
|      John Doe|       Awesome Movie|         null|
|     Mary Jane|                null|         2019|
|Vikter Duplaix|Not another teen ...|         2001|
+--------------+--------------------+-------------+



In [57]:
#filtered df for where actor name is not null
bad_movies_df.filter(col("actor_name").isNull() != True).show()

+--------------+--------------------+-------------+
|    actor_name|         movie_title|produced_year|
+--------------+--------------------+-------------+
|      John Doe|       Awesome Movie|         null|
|     Mary Jane|                null|         2019|
|Vikter Duplaix|Not another teen ...|         2001|
+--------------+--------------------+-------------+



In [58]:
##summary of columns in a df using describe 

bad_movies_df.describe("produced_year").show()

[Stage 42:>                                                         (0 + 8) / 8]

+-------+-----------------+
|summary|    produced_year|
+-------+-----------------+
|  count|                4|
|   mean|          2015.25|
| stddev|9.535023160258536|
|    min|             2001|
|    max|             2021|
+-------+-----------------+



                                                                                

In [59]:
bad_movies_df.describe("actor_name").show()



+-------+--------------+
|summary|    actor_name|
+-------+--------------+
|  count|             3|
|   mean|          null|
| stddev|          null|
|    min|      John Doe|
|    max|Vikter Duplaix|
+-------+--------------+



                                                                                

## creating spark functions to use 

In [60]:
from pyspark.sql.functions import udf

In [61]:
student_list = [("joe",90),
                ("jack",95),
                ("jill",100)]

In [62]:
student_cols = ["name","score"]

In [63]:
student_df = spark.createDataFrame(student_list,student_cols)

In [64]:
student_df.show()

+----+-----+
|name|score|
+----+-----+
| joe|   90|
|jack|   95|
|jill|  100|
+----+-----+



#### Create a pyhton function called letter grade

In [65]:
def lettergrade(score:int):
    
    grade = ""
    
    if score > 100:
        grade = "cheating"
        
    elif score >=90:
        grade = "A"
        
    elif score >=80:
        grade = "B"
        
    elif score >=70:
        grade = "C"
        
    else:
        grade = "F"
    
    return grade

In [66]:
print(lettergrade(56))

F


In [67]:
#Convert thid function to a udf
#the Udf are poorly optimized and should be avooided as much as possible
lettergradeudf = udf(lettergrade)

In [68]:
student_df.select("name","score", lettergradeudf(col("score")).alias("grade")).show()

[Stage 53:>                                                         (0 + 3) / 3]

+----+-----+-----+
|name|score|grade|
+----+-----+-----+
| joe|   90|    A|
|jack|   95|    A|
|jill|  100|    A|
+----+-----+-----+



                                                                                

## Aggregations

In [71]:
filepath = "./data/flights/flight-summary.csv"

flights_df = (spark.read.format("csv")
              .option('header',True)
              .option('inferSchema',True)
              .load(filepath))

In [72]:
flights_df.count()

4693

In [73]:
flights_df.show(10,truncate=False)

+-----------+------------------------------------------------+------------+------------+---------+--------------------------------------------+----------------+----------+-----+
|origin_code|origin_airport                                  |origin_city |origin_state|dest_code|dest_airport                                |dest_city       |dest_state|count|
+-----------+------------------------------------------------+------------+------------+---------+--------------------------------------------+----------------+----------+-----+
|BQN        |Rafael Hernández Airport                        |Aguadilla   |PR          |MCO      |Orlando International Airport               |Orlando         |FL        |441  |
|PHL        |Philadelphia International Airport              |Philadelphia|PA          |MCO      |Orlando International Airport               |Orlando         |FL        |4869 |
|MCI        |Kansas City International Airport               |Kansas City |MO          |IAH      |George Bush 

In [74]:
flights_df.printSchema()

root
 |-- origin_code: string (nullable = true)
 |-- origin_airport: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- origin_state: string (nullable = true)
 |-- dest_code: string (nullable = true)
 |-- dest_airport: string (nullable = true)
 |-- dest_city: string (nullable = true)
 |-- dest_state: string (nullable = true)
 |-- count: integer (nullable = true)



In [75]:
flights_df = (flights_df
             .withColumnRenamed("count", "flight_count"))

In [76]:
flights_df.show(10)

+-----------+--------------------+------------+------------+---------+--------------------+----------------+----------+------------+
|origin_code|      origin_airport| origin_city|origin_state|dest_code|        dest_airport|       dest_city|dest_state|flight_count|
+-----------+--------------------+------------+------------+---------+--------------------+----------------+----------+------------+
|        BQN|Rafael Hernández ...|   Aguadilla|          PR|      MCO|Orlando Internati...|         Orlando|        FL|         441|
|        PHL|Philadelphia Inte...|Philadelphia|          PA|      MCO|Orlando Internati...|         Orlando|        FL|        4869|
|        MCI|Kansas City Inter...| Kansas City|          MO|      IAH|George Bush Inter...|         Houston|        TX|        1698|
|        SPI|Abraham Lincoln C...| Springfield|          IL|      ORD|Chicago O'Hare In...|         Chicago|        IL|         998|
|        SNA|John Wayne Airpor...|   Santa Ana|          CA|      PHX

### Count(col) and count distinct(col)

In [77]:
from pyspark.sql.functions import count,count_distinct

In [79]:
#count for origin and origin airport count excludes null values

flights_df.select(count('origin_airport'),count('dest_airport')).show()

[Stage 64:>                                                         (0 + 1) / 1]

+---------------------+-------------------+
|count(origin_airport)|count(dest_airport)|
+---------------------+-------------------+
|                 4693|               4693|
+---------------------+-------------------+



                                                                                

In [80]:
bad_movies_df.show()

+--------------+--------------------+-------------+
|    actor_name|         movie_title|produced_year|
+--------------+--------------------+-------------+
|          null|                null|         null|
|          null|                null|         2020|
|      John Doe|       Awesome Movie|         null|
|          null|       Awesome Movie|         2021|
|     Mary Jane|                null|         2019|
|Vikter Duplaix|Not another teen ...|         2001|
+--------------+--------------------+-------------+



In [81]:
bad_movies_df.select(count("movie_title"),count("actor_name"),count("produced_year")).show()

[Stage 70:>                                                         (0 + 8) / 8]

+------------------+-----------------+--------------------+
|count(movie_title)|count(actor_name)|count(produced_year)|
+------------------+-----------------+--------------------+
|                 3|                3|                   4|
+------------------+-----------------+--------------------+



                                                                                

In [84]:
#to get the number of unique values in the column
flights_df.select(count_distinct('origin_airport'),count_distinct('dest_airport'), count("*").alias('total_number')).show()

[Stage 85:>                                                         (0 + 1) / 1]

+------------------------------+----------------------------+------------+
|count(DISTINCT origin_airport)|count(DISTINCT dest_airport)|total_number|
+------------------------------+----------------------------+------------+
|                           322|                         322|        4693|
+------------------------------+----------------------------+------------+



                                                                                

### min(col), max(col), sum(col), sum distinct(col), avg

In [99]:
from pyspark.sql.functions import min,max,sum, sum_distinct,avg

In [94]:
#shows the busiest flight route
flights_df.select(min(col("flight_count")), max(col("flight_count"))).show()

+-----------------+-----------------+
|min(flight_count)|max(flight_count)|
+-----------------+-----------------+
|                1|            13744|
+-----------------+-----------------+



In [96]:
#sum of flight counts
flights_df.select(sum(col("flight_count"))).show()

+-----------------+
|sum(flight_count)|
+-----------------+
|          5332914|
+-----------------+



In [100]:
flights_df.select(sum_distinct(col("flight_count"))).show()

+--------------------------+
|sum(DISTINCT flight_count)|
+--------------------------+
|                   3612257|
+--------------------------+



In [93]:
#find the flight route thats is the busiest
flights_df.where(col("flight_count") == 13744).show(truncate=False)

+-----------+-----------------------------------+-------------+------------+---------+---------------------------------+-----------+----------+------------+
|origin_code|origin_airport                     |origin_city  |origin_state|dest_code|dest_airport                     |dest_city  |dest_state|flight_count|
+-----------+-----------------------------------+-------------+------------+---------+---------------------------------+-----------+----------+------------+
|SFO        |San Francisco International Airport|San Francisco|CA          |LAX      |Los Angeles International Airport|Los Angeles|CA        |13744       |
+-----------+-----------------------------------+-------------+------------+---------+---------------------------------+-----------+----------+------------+



In [101]:
#average flight count
flights_df.select(avg(col("flight_count"))).show()

+------------------+
| avg(flight_count)|
+------------------+
|1136.3549968037503|
+------------------+



### Agregations with grouping

In [102]:
flights_df.groupBy("origin_airport").count().orderBy("count", ascending=False).show(5,False)

                                                                                

+------------------------------------------------+-----+
|origin_airport                                  |count|
+------------------------------------------------+-----+
|Hartsfield-Jackson Atlanta International Airport|169  |
|Chicago O'Hare International Airport            |162  |
|Dallas/Fort Worth International Airport         |148  |
|Denver International Airport                    |139  |
|Minneapolis-Saint Paul International Airport    |120  |
+------------------------------------------------+-----+
only showing top 5 rows



In [106]:
#find the maxinmun flight count per each origin airport in descending order
(flights_df.groupBy("origin_airport")
          .agg(max("flight_count").alias("max_flight_count"))
          .orderBy("max_flight_count", ascending=False)).show(5,False)

+----------------------------------------------------------------------+----------------+
|origin_airport                                                        |max_flight_count|
+----------------------------------------------------------------------+----------------+
|San Francisco International Airport                                   |13744           |
|Los Angeles International Airport                                     |13457           |
|John F. Kennedy International Airport (New York International Airport)|12016           |
|McCarran International Airport                                        |9715            |
|LaGuardia Airport (Marine Air Terminal)                               |9639            |
+----------------------------------------------------------------------+----------------+
only showing top 5 rows



In [105]:
#count of records associated with the city of califronia grouped by city

(flights_df.groupBy("origin_state", "origin_city")
           .count()
           .where(col("origin_state")=="CA")
           .orderBy("count",ascending=False)
).show(10,False)

+------------+-------------+-----+
|origin_state|origin_city  |count|
+------------+-------------+-----+
|CA          |San Francisco|80   |
|CA          |Los Angeles  |80   |
|CA          |San Diego    |47   |
|CA          |Oakland      |35   |
|CA          |Sacramento   |27   |
|CA          |San Jose     |25   |
|CA          |Santa Ana    |22   |
|CA          |Ontario      |14   |
|CA          |Long Beach   |12   |
|CA          |Palm Springs |12   |
+------------+-------------+-----+
only showing top 10 rows



In [108]:
#MULTIPLE AGGREGATIONS FOR A SINGLE COLUMN 

(flights_df.groupBy("origin_airport")
          .agg(max("flight_count"),
               min("flight_count"),
               sum("flight_count"),
               count("flight_count"))).show(5,False)

+-------------------------------------------------+-----------------+-----------------+-----------------+-------------------+
|origin_airport                                   |max(flight_count)|min(flight_count)|sum(flight_count)|count(flight_count)|
+-------------------------------------------------+-----------------+-----------------+-----------------+-------------------+
|Melbourne International Airport                  |1332             |1332             |1332             |1                  |
|San Diego International Airport (Lindbergh Field)|6942             |4                |70207            |46                 |
|Eppley Airfield                                  |2083             |1                |16753            |21                 |
|Kahului Airport                                  |8313             |67               |20627            |18                 |
|Austin-Bergstrom International Airport           |4674             |8                |42067            |41           