In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import col, desc, asc, avg, sum, max, min, mean, count, round

spark = SparkSession.builder.appName("local_spark").getOrCreate()

In [3]:
spark

1)	Read data from CSV

inferSchema=True assumes datatype

In [8]:
sparkdf = spark.read.csv('students.csv',header=True)

2)	Print 5 rows ( !!! head() method behaves differently in pySpark)

Matrix

In [9]:
sparkdf.head(5)

[Row(id='1', name='Emily Hardie', class='Four', mark='75', gender='female'),
 Row(id='2', name='John Star', class='Three', mark='85', gender='male'),
 Row(id='3', name='Arnold Walker ', class='Three', mark='55', gender='male'),
 Row(id='4', name='Reana Talu', class='Four', mark='60', gender='female'),
 Row(id='5', name='Sidona Williams', class='Four', mark='60', gender='female')]

Table

In [10]:
sparkdf.show(5)

+---+---------------+-----+----+------+
| id|           name|class|mark|gender|
+---+---------------+-----+----+------+
|  1|   Emily Hardie| Four|  75|female|
|  2|      John Star|Three|  85|  male|
|  3| Arnold Walker |Three|  55|  male|
|  4|     Reana Talu| Four|  60|female|
|  5|Sidona Williams| Four|  60|female|
+---+---------------+-----+----+------+
only showing top 5 rows



In [None]:
# sparkdf2=spark.read.format('csv').option('inferSchema',True).option('header',True).option('sep',True).load('students.csv')

3)	Show schema of the spark data frame.

In [11]:
sparkdf.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- class: string (nullable = true)
 |-- mark: string (nullable = true)
 |-- gender: string (nullable = true)



4)	Update schema after reading form CSV. (StructField)

In [15]:
schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("name", StringType(), True),
        StructField("class", StringType(), True),
        StructField("mark", IntegerType(), True),
        StructField("gender", StringType(), True)])

df = spark.read.csv('students.csv', header=True, schema=schema)
df.printSchema()
df.show()

Updates schema without reading file again

In [14]:
df.withColumn('mark', col('mark').cast(StringType())).printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- class: string (nullable = true)
 |-- MARK: string (nullable = true)
 |-- gender: string (nullable = true)



5)	Give schema as option while reading from CSV

In [16]:
df = spark.read.csv('students.csv', header=True, schema=schema)

6)	Show columns and show summary statistics of numeric columns

No quartiles

In [17]:
df.describe().show()

+-------+------------------+------------+-----+-----------------+------+
|summary|                id|        name|class|             mark|gender|
+-------+------------------+------------+-----+-----------------+------+
|  count|                35|          35|   35|               35|    35|
|   mean|              18.0|        null| null|75.51428571428572|  null|
| stddev|10.246950765959598|        null| null|13.95448784772974|  null|
|    min|                 1|   Alex John| Four|               48|female|
|    max|                35|Wookie Davey|  Two|               96|  male|
+-------+------------------+------------+-----+-----------------+------+



With quartiles

In [18]:
df.summary().show()

+-------+------------------+------------+-----+-----------------+------+
|summary|                id|        name|class|             mark|gender|
+-------+------------------+------------+-----+-----------------+------+
|  count|                35|          35|   35|               35|    35|
|   mean|              18.0|        null| null|75.51428571428572|  null|
| stddev|10.246950765959598|        null| null|13.95448784772974|  null|
|    min|                 1|   Alex John| Four|               48|female|
|    25%|                 9|        null| null|               60|  null|
|    50%|                18|        null| null|               79|  null|
|    75%|                27|        null| null|               88|  null|
|    max|                35|Wookie Davey|  Two|               96|  male|
+-------+------------------+------------+-----+-----------------+------+



In [56]:
numeric_cols = [c for c,t in df.dtypes if t in ('int')] # Finds only columns that have an interger data type

df.select(numeric_cols).describe().show()

+-------+------------------+-----------------+
|summary|                id|             mark|
+-------+------------------+-----------------+
|  count|                35|               35|
|   mean|              18.0|75.51428571428572|
| stddev|10.246950765959598|13.95448784772974|
|    min|                 1|               48|
|    max|                35|               96|
+-------+------------------+-----------------+



7)	Read from JSON

In [20]:
json_spark_df = spark.read.json('people.json')
json_spark_df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  36|   Alan|
|  19| Justin|
+----+-------+



8)	Subset the Dataframe for one and then more than one columns. 

In [30]:
id = df.select('id')

# id.show()

id_name_mark = df.select(['id','name','mark'])

id_name_mark.show(5)

+---+---------------+----+
| id|           name|mark|
+---+---------------+----+
|  1|   Emily Hardie|  75|
|  2|      John Star|  85|
|  3| Arnold Walker |  55|
|  4|     Reana Talu|  60|
|  5|Sidona Williams|  60|
+---+---------------+----+
only showing top 5 rows



In [29]:
df['id']

Column<'id'>

9)	Filter Data frame based on condition.\
•	Filter according to gender\
•	Filter according to mark > 50\
•	Filter by multiple conditions

In [34]:
# df.filter(df.gender=='male').show()
# df.filter(df.mark>50).show()
df.filter((df['gender'] == 'male') & (df.mark > 50)).show()

+---+----------------+-----+----+------+
| id|            name|class|mark|gender|
+---+----------------+-----+----+------+
|  2|       John Star|Three|  85|  male|
|  3|  Arnold Walker |Three|  55|  male|
|  6|       Alex John| Four|  55|  male|
|  7|    Robert John |Three|  78|  male|
|  8|       Lee Malva| Four|  85|  male|
|  9|    Wookie Davey|  Two|  78|  male|
| 15|       Scott Row| Four|  88|  male|
| 16|     Daniel Page| Four|  88|  male|
| 17|  James Williams|Three|  54|  male|
| 18|Martin Johnston | Four|  75|  male|
| 23|       Sam Adan |Three|  79|  male|
| 24|   Nova Prescott|  Two|  78|  male|
| 25|  William Taylor| Four|  88|  male|
| 26|   Laurin Wilson|Three|  79|  male|
| 29|         Ben Day| Four|  55|  male|
| 31|      Chris Ball| Four|  88|  male|
| 34|   Garry Richard|Three|  69|  male|
+---+----------------+-----+----+------+



10)	Add new column,\
•	New column name: corrected mark\
•	It has values mark + 3

In [35]:
df.withColumn('corrected_mark', (df.mark + 3)).show()

+---+----------------+-----+----+------+--------------+
| id|            name|class|mark|gender|corrected_mark|
+---+----------------+-----+----+------+--------------+
|  1|    Emily Hardie| Four|  75|female|            78|
|  2|       John Star|Three|  85|  male|            88|
|  3|  Arnold Walker |Three|  55|  male|            58|
|  4|      Reana Talu| Four|  60|female|            63|
|  5| Sidona Williams| Four|  60|female|            63|
|  6|       Alex John| Four|  55|  male|            58|
|  7|    Robert John |Three|  78|  male|            81|
|  8|       Lee Malva| Four|  85|  male|            88|
|  9|    Wookie Davey|  Two|  78|  male|            81|
| 10|      Diane Rose|  Two|  55|female|            58|
| 11|    Holly Daives|  Two|  89|female|            92|
| 12|        Eva Cup |Three|  94|female|            97|
| 13| Victoria Mathew| Four|  88|female|            91|
| 14|       Iris Zhao|  Two|  88|female|            91|
| 15|       Scott Row| Four|  88|  male|        

11)	Groupby gender\
•	Calculate the average mark for each gender\
•	Max\
•	Min

In [39]:
# df.groupBy('gender').avg('mark').show()
# df.groupBy('gender').min('mark').show()
# df.groupBy('gender').max('mark').show()

df.groupBy('gender')\
.agg(
(round(avg('mark'),2).alias('Avg')),
max('mark').alias('Max'),
min('mark').alias('Min'))\
.show()

+------+-----+---+---+
|gender|  Avg|Max|Min|
+------+-----+---+---+
|female|77.53| 96| 55|
|  male|73.61| 88| 48|
+------+-----+---+---+



12. Aggregation:\
•	Calculate the average mark of all students.

In [42]:
df.groupBy().avg('mark').show()

+-----------------+
|        avg(mark)|
+-----------------+
|75.51428571428572|
+-----------------+



13)	Order by 
- “class”
- “mark”
- “mark” and descending order.

In [57]:
# sparkdf.orderBy('class').show()
# sparkdf.orderBy(asc('mark')).show()
sparkdf.orderBy(desc('mark')).show()
sparkdf.orderBy('mark', ascending=False).show()

+---+---------------+-----+----+------+
| id|           name|class|mark|gender|
+---+---------------+-----+----+------+
| 33|  Elisa Richard| Four|  96|female|
| 12|       Eva Cup |Three|  94|female|
| 32|       Ela Love| Four|  90|female|
| 11|   Holly Daives|  Two|  89|female|
| 13|Victoria Mathew| Four|  88|female|
| 16|    Daniel Page| Four|  88|  male|
| 35|     Ria Wright|  Two|  88|female|
| 14|      Iris Zhao|  Two|  88|female|
| 15|      Scott Row| Four|  88|  male|
| 25| William Taylor| Four|  88|  male|
| 31|     Chris Ball| Four|  88|  male|
| 28| Emily Thompson| Four|  86|female|
|  2|      John Star|Three|  85|  male|
|  8|      Lee Malva| Four|  85|  male|
| 27|Fatemah Abraham|Three|  81|female|
| 26|  Laurin Wilson|Three|  79|  male|
| 30|    Rabiya Khan|Three|  79|female|
| 23|      Sam Adan |Three|  79|  male|
|  7|   Robert John |Three|  78|  male|
|  9|   Wookie Davey|  Two|  78|  male|
+---+---------------+-----+----+------+
only showing top 20 rows

+---+---------

14)	Access to specific row( hint: collect() method)  and then convert it to dictionary. 

In [53]:
# first_row = sparkdf.collect()[0]
# my_dict = first_row.asDict()
# print(first_row)
# print(my_dict)
rows = df.filter(df.mark > 90).collect()
for row in rows:
    display(row.asDict())
    print(row.asDict())

{'id': 12,
 'name': 'Eva Cup ',
 'class': 'Three',
 'mark': 94,
 'gender': 'female'}

{'id': 12, 'name': 'Eva Cup ', 'class': 'Three', 'mark': 94, 'gender': 'female'}


{'id': 33,
 'name': 'Elisa Richard',
 'class': 'Four',
 'mark': 96,
 'gender': 'female'}

{'id': 33, 'name': 'Elisa Richard', 'class': 'Four', 'mark': 96, 'gender': 'female'}


15. Create a view from the dataframe and filter it by using SQL syntax.

In [55]:
df.createOrReplaceTempView('students')
spark.sql(
    '''
    SELECT name, mark 
    FROM students 
    WHERE gender = "male"
    ''')\
    .show()

+----------------+----+
|            name|mark|
+----------------+----+
|       John Star|  85|
|  Arnold Walker |  55|
|       Alex John|  55|
|    Robert John |  78|
|       Lee Malva|  85|
|    Wookie Davey|  78|
|       Scott Row|  88|
|     Daniel Page|  88|
|  James Williams|  54|
|Martin Johnston |  75|
|     John Smith |  48|
|       Sam Adan |  79|
|   Nova Prescott|  78|
|  William Taylor|  88|
|   Laurin Wilson|  79|
|         Ben Day|  55|
|      Chris Ball|  88|
|   Garry Richard|  69|
+----------------+----+

