In [3]:
import pyspark
from pyspark.sql import SparkSession
import numpy as np

In [4]:
spark = SparkSession.builder.getOrCreate()

In [4]:
student_data = [('Ahmed',25), ('Mona',30), ('Shahd',17), ('Shahd',22), ('Mona',16)]

In [5]:
df_student = spark.createDataFrame(student_data, ["Name", "Age"])
df_student.show()

+-----+---+
| Name|Age|
+-----+---+
|Ahmed| 25|
| Mona| 30|
|Shahd| 17|
|Shahd| 22|
| Mona| 16|
+-----+---+



In [7]:
df_student.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)



In [8]:
df_student[0]

Column<'Name'>

In [9]:
df_student.Name

Column<'Name'>

In [11]:
df_student20 = df_student.filter(df_student["Age"] > 20)
df_student20.show()

+-----+---+
| Name|Age|
+-----+---+
|Ahmed| 25|
| Mona| 30|
|Shahd| 22|
+-----+---+



In [13]:
df_people = spark.read.json("/data/people.json")
df_people.show()

+----+------+
| age|  name|
+----+------+
|NULL| Ahmed|
|  30|  Mona|
|  19| Shahd|
|  19| Shahd|
|  25|Khaled|
+----+------+



In [20]:
df_avg = df_people.groupBy('name').avg('age')
df_avg.show()   

+------+--------+
|  name|avg(age)|
+------+--------+
|Khaled|    25.0|
| Shahd|    19.0|
| Ahmed|    NULL|
|  Mona|    30.0|
+------+--------+



In [None]:
df_avg = df_people.groupBy('name').avg('age')
df_avg.show()   

In [24]:
import pyspark.sql.functions as fun 

In [None]:
df_avg = df_people.groupBy('name').avg().count()
df_avg.show()   

In [28]:
df_avg_sum = df_people.groupBy('name').agg(
    fun.avg('age').alias('avg_age')
    ,fun.sum('age').alias('sum_age')
    ,fun.count('age').alias('count_age')
                                           )
df_avg_sum.show()   

+------+-------+-------+---------+
|  name|avg_age|sum_age|count_age|
+------+-------+-------+---------+
|Khaled|   25.0|     25|        1|
| Shahd|   19.0|     38|        2|
| Ahmed|   NULL|   NULL|        0|
|  Mona|   30.0|     30|        1|
+------+-------+-------+---------+



In [29]:
df_names = df_people.select('name')
df_names.show()

+------+
|  name|
+------+
| Ahmed|
|  Mona|
| Shahd|
| Shahd|
|Khaled|
+------+



In [33]:
df_us = spark.read.csv('/data/mnm_dataset.csv', header=True, inferSchema=False)
df_us.printSchema()

root
 |-- State: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Count: string (nullable = true)



In [32]:
df_us = spark.read.csv('/data/mnm_dataset.csv', header=True, inferSchema=True)
df_us.printSchema()

root
 |-- State: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Count: integer (nullable = true)



In [38]:
df_us.show()

+-----+------+-----+
|State| Color|Count|
+-----+------+-----+
|   TX|   Red|   20|
|   NV|  Blue|   66|
|   CO|  Blue|   79|
|   OR|  Blue|   71|
|   WA|Yellow|   93|
|   WY|  Blue|   16|
|   CA|Yellow|   53|
|   WA| Green|   60|
|   OR| Green|   71|
|   TX| Green|   68|
|   NV| Green|   59|
|   AZ| Brown|   95|
|   WA|Yellow|   20|
|   AZ|  Blue|   75|
|   OR| Brown|   72|
|   NV|   Red|   98|
|   WY|Orange|   45|
|   CO|  Blue|   52|
|   TX| Brown|   94|
|   CO|   Red|   82|
+-----+------+-----+
only showing top 20 rows



In [45]:
df_us1 = df_us.groupBy('State', 'Color').agg(fun.sum('Count').alias('Total'),)
df_us1.show()  

+-----+------+-------+
|State| Color|  Total|
+-----+------+-------+
|   WY| Green|94339.0|
|   NV|   Red|89346.0|
|   UT|  Blue|89977.0|
|   WA|Orange|91521.0|
|   NM| Green|91160.0|
|   CA|  Blue|89123.0|
|   WA|   Red|93332.0|
|   NV| Brown|92478.0|
|   AZ| Green|91882.0|
|   CA|   Red|91527.0|
|   AZ|Orange|91684.0|
|   CO|  Blue|93412.0|
|   NM|Orange|91251.0|
|   NM|Yellow|92747.0|
|   WY|Orange|87956.0|
|   UT|Yellow|89264.0|
|   WY|   Red|91768.0|
|   OR|  Blue|90526.0|
|   NV|Orange|93929.0|
|   AZ|Yellow|90946.0|
+-----+------+-------+
only showing top 20 rows



In [46]:
df_us2 = df_us.sort('State')
df_us2.show()

+-----+------+-----+
|State| Color|Count|
+-----+------+-----+
|   AZ|Orange|   50|
|   AZ| Green|   92|
|   AZ|Yellow|   49|
|   AZ|Yellow|   83|
|   AZ|Yellow|   80|
|   AZ| Brown|   95|
|   AZ|Yellow|   80|
|   AZ|   Red|   59|
|   AZ|  Blue|   24|
|   AZ|Orange|   54|
|   AZ|Yellow|   41|
|   AZ|  Blue|   75|
|   AZ| Green|   99|
|   AZ|Yellow|   72|
|   AZ| Brown|   90|
|   AZ|Orange|   53|
|   AZ|Yellow|   23|
|   AZ|  Blue|   82|
|   AZ| Brown|   41|
|   AZ| Green|   89|
+-----+------+-----+
only showing top 20 rows



In [47]:
df_us2 = df_us.sort('State' , ascending=False)
df_us2.show()

+-----+------+-----+
|State| Color|Count|
+-----+------+-----+
|   WY|   Red|   22|
|   WY| Green|   39|
|   WY|Yellow|   42|
|   WY|Orange|   45|
|   WY|  Blue|   59|
|   WY|Yellow|   48|
|   WY|  Blue|   53|
|   WY|Yellow|   56|
|   WY| Brown|   87|
|   WY|Orange|   23|
|   WY| Brown|   43|
|   WY| Brown|   30|
|   WY|Orange|   81|
|   WY|  Blue|   16|
|   WY|  Blue|   80|
|   WY| Green|   15|
|   WY|Orange|   99|
|   WY| Brown|   85|
|   WY| Brown|   36|
|   WY|Yellow|   78|
+-----+------+-----+
only showing top 20 rows



In [48]:
from pyspark.sql.types import *

In [66]:
schema = StructType([
    StructField("ID", StringType()),
    StructField("Names", StringType()),
    StructField("Sales", IntegerType())
])

In [67]:
df_sales = spark.read.csv('/data/NullData.csv', header=True, schema=schema)
df_sales.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Names: string (nullable = true)
 |-- Sales: integer (nullable = true)



In [68]:
df_sales.show()

+----+-----+-----+
|  ID|Names|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL| NULL|
|emp4|Cindy| NULL|
+----+-----+-----+



In [70]:
myschema = 'ID int not null, Names string, Sales int'

df_sales = spark.read.csv('/data/NullData.csv', header=True, schema=myschema)
df_sales.printSchema()
df_sales.show()

root
 |-- ID: integer (nullable = true)
 |-- Names: string (nullable = true)
 |-- Sales: integer (nullable = true)

+----+-----+-----+
|  ID|Names|Sales|
+----+-----+-----+
|NULL| John| NULL|
|NULL| NULL| NULL|
|NULL| NULL| NULL|
|NULL|Cindy| NULL|
+----+-----+-----+



In [72]:
schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING,`Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"

In [73]:
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter","LinkedIn"]],
[2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter","LinkedIn"]],
[3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web","twitter", "FB", "LinkedIn"]],
[4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568,["twitter", "FB"]],
[5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web","twitter", "FB", "LinkedIn"]],
[6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568,["twitter", "LinkedIn"]]]

In [81]:
df_blog = spark.createDataFrame(data)
df_blog.printSchema()

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)
 |-- _4: string (nullable = true)
 |-- _5: string (nullable = true)
 |-- _6: long (nullable = true)
 |-- _7: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [82]:
df_blog = spark.createDataFrame(data, schema)
df_blog.printSchema()
df_blog.show()

root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: string (nullable = true)
 |-- Hits: integer (nullable = true)
 |-- Campaigns: array (nullable = true)
 |    |-- element: string (containsNull = true)

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25

In [84]:
df_campaigns = df_blog.select('Campaigns')
df_campaigns.show()

+--------------------+
|           Campaigns|
+--------------------+
| [twitter, LinkedIn]|
| [twitter, LinkedIn]|
|[web, twitter, FB...|
|       [twitter, FB]|
|[web, twitter, FB...|
| [twitter, LinkedIn]|
+--------------------+



In [86]:
df_hits = df_blog.select(df_blog['Hits'] * 10)
df_hits.show()

+-----------+
|(Hits * 10)|
+-----------+
|      45350|
|      89080|
|      76590|
|     105680|
|     405780|
|     255680|
+-----------+



In [92]:
df_hits = df_blog.select((fun.col('Hits') * 10).alias('Hits'))
df_hits.show()

+------+
|  Hits|
+------+
| 45350|
| 89080|
| 76590|
|105680|
|405780|
|255680|
+------+



In [107]:
df_names = df_blog.withColumn('Full Name', fun.concat(fun.col('First'),  fun.col('Last')))
df_names.show()

+---+---------+-------+-----------------+---------+-----+--------------------+------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|   Full Name|
+---+---------+-------+-----------------+---------+-----+--------------------+------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|  JulesDamji|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]| BrookeWenig|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|    DennyLee|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|TathagataDas|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|MateiZaharia|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|  ReynoldXin|
+---+---------+-------+-----------------+---------+-----+--------------------+------------+



In [108]:
df_names = df_blog.withColumn('Full Name', fun.concat_ws(' ' ,fun.col('First'),  fun.col('Last')))
df_names.show()

+---+---------+-------+-----------------+---------+-----+--------------------+-------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|    Full Name|
+---+---------+-------+-----------------+---------+-----+--------------------+-------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|  Jules Damji|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]| Brooke Wenig|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|    Denny Lee|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|Tathagata Das|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|Matei Zaharia|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|  Reynold Xin|
+---+---------+-------+-----------------+---------+-----+--------------------+-------------+



In [112]:
df_blog1 = df_blog.sort(df_blog['Hits'], ascending=False)
df_blog1.show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



In [113]:
df_blog1 = df_blog.sort(df_blog['Hits'], ascending=False)
df_blog1.show(truncate=False)

+---+---------+-------+-----------------+---------+-----+----------------------------+
|Id |First    |Last   |Url              |Published|Hits |Campaigns                   |
+---+---------+-------+-----------------+---------+-----+----------------------------+
|5  |Matei    |Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB, LinkedIn]|
|6  |Reynold  |Xin    |https://tinyurl.6|3/2/2015 |25568|[twitter, LinkedIn]         |
|4  |Tathagata|Das    |https://tinyurl.4|5/12/2018|10568|[twitter, FB]               |
|2  |Brooke   |Wenig  |https://tinyurl.2|5/5/2018 |8908 |[twitter, LinkedIn]         |
|3  |Denny    |Lee    |https://tinyurl.3|6/7/2019 |7659 |[web, twitter, FB, LinkedIn]|
|1  |Jules    |Damji  |https://tinyurl.1|1/4/2016 |4535 |[twitter, LinkedIn]         |
+---+---------+-------+-----------------+---------+-----+----------------------------+



In [116]:
df_sales = spark.read.csv('/data/NullData.csv', header=True, inferSchema=True)
df_sales.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [117]:
df_sales.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [118]:
df_sales.na.drop('any').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [119]:
df_sales.na.drop('all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [120]:
df_sales.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [122]:
df_sales.na.fill('No').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2|   No| NULL|
|emp3|   No|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [143]:
df_mean = df_sales.select(fun.mean(df_sales['Sales']))
mean = df_mean.collect()[0][0]

In [146]:
df_sales.na.fill({"Sales": mean}).show()


+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| NULL|400.5|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [149]:
df_sales.na.fill({
    "Sales": mean,
    'Name': 'No'
                  }).show()


+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2|   No|400.5|
|emp3|   No|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [154]:
df = (df_sales.select('Name')
    .where(df_sales['Sales'].isNotNull())
)
df.show()

+-----+
| Name|
+-----+
| NULL|
|Cindy|
+-----+



In [156]:
df = (df_sales.select('Name')
    .where(df_sales['Sales'].isNotNull())
    .count()
)
df

2