# Reference: 
# https://spark.apache.org/docs/latest/api/python/pyspark.sql.html

In [None]:
from pyspark import SparkContext
sc = SparkContext()

In [2]:
# Calculate Average of the following data using Spark RDD
rdd = sc.parallelize([('Barcelona',2), ('Rome',3), ('Paris',4), ('Vegas',5), ('Barcelona', 8), ('Vegas',9), ('Rome',3)])
rdd.collect()

[('Barcelona', 2),
 ('Rome', 3),
 ('Paris', 4),
 ('Vegas', 5),
 ('Barcelona', 8),
 ('Vegas', 9),
 ('Rome', 3)]

In [3]:
rdd_mapped = rdd.map(lambda item: (item[0], [item[1], 1]))
rdd_mapped.collect()

[('Barcelona', [2, 1]),
 ('Rome', [3, 1]),
 ('Paris', [4, 1]),
 ('Vegas', [5, 1]),
 ('Barcelona', [8, 1]),
 ('Vegas', [9, 1]),
 ('Rome', [3, 1])]

In [4]:
rdd_reduced = rdd_mapped.reduceByKey(lambda a,b: (a[0] + b[0], a[1] + b[1]))
rdd_reduced.collect()

[('Barcelona', (10, 2)),
 ('Paris', [4, 1]),
 ('Vegas', (14, 2)),
 ('Rome', (6, 2))]

In [5]:
rdd_average = rdd_reduced.map(lambda item: (item[0], item[1][0] / item[1][1]))
rdd_average.collect()

[('Barcelona', 5.0), ('Paris', 4.0), ('Vegas', 7.0), ('Rome', 3.0)]

## Dataframes / SQL 

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [7]:
from pyspark import Row
rdd_for_df = rdd.map(lambda item: Row(city=item[0], counts=item[1]))
rdd_for_df.collect()

[Row(city='Barcelona', counts=2),
 Row(city='Rome', counts=3),
 Row(city='Paris', counts=4),
 Row(city='Vegas', counts=5),
 Row(city='Barcelona', counts=8),
 Row(city='Vegas', counts=9),
 Row(city='Rome', counts=3)]

In [8]:
# Convert the rdd to dataframe
df_city = rdd_for_df.toDF()
df_city.show()

+---------+------+
|     city|counts|
+---------+------+
|Barcelona|     2|
|     Rome|     3|
|    Paris|     4|
|    Vegas|     5|
|Barcelona|     8|
|    Vegas|     9|
|     Rome|     3|
+---------+------+



In [9]:
df_city.collect()

[Row(city='Barcelona', counts=2),
 Row(city='Rome', counts=3),
 Row(city='Paris', counts=4),
 Row(city='Vegas', counts=5),
 Row(city='Barcelona', counts=8),
 Row(city='Vegas', counts=9),
 Row(city='Rome', counts=3)]

In [10]:
# use the dataframe's native functions
# NOTE: It is allowed to couple multiple dataframe operations together as one line
df_city.groupBy('city').avg('counts').show()

+---------+-----------+
|     city|avg(counts)|
+---------+-----------+
|    Vegas|        7.0|
|    Paris|        4.0|
|Barcelona|        5.0|
|     Rome|        3.0|
+---------+-----------+



In [11]:
# alternatively, you could use a sql query
df_city.createOrReplaceTempView("CityTable")

In [12]:
avg_query = spark.sql("SELECT city, AVG(counts) FROM CityTable GROUP BY city") # use alias
avg_query.show()

+---------+-----------+
|     city|avg(counts)|
+---------+-----------+
|    Vegas|        7.0|
|    Paris|        4.0|
|Barcelona|        5.0|
|     Rome|        3.0|
+---------+-----------+



### MULTIPLE DATA SOURCES

In [13]:
df_people_1 = spark.read.json("data/people_1.json")
df_people_1

DataFrame[age: bigint, name: string]

In [14]:
df_people_1.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [15]:
df_people_1.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [16]:
# create additional data
rdd_people_2 = sc.parallelize([Row(name="Alice", age=45), Row(name="Bran", age=15)])
df_people_2 = rdd_people_2.toDF()

In [17]:
df_people_2.show()

+---+-----+
|age| name|
+---+-----+
| 45|Alice|
| 15| Bran|
+---+-----+



In [18]:
df_people_2.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [19]:
# Write the file as a parquet file
df_people_2.write.parquet("data/people2.parquet")

In [20]:
# optional: read the file back to see if you could load the file back
df_people_2 = spark.read.parquet("data/people2.parquet")
df_people_2

DataFrame[age: bigint, name: string]

In [22]:
df_people_2.show()
df_people_1.show()

+---+-----+
|age| name|
+---+-----+
| 45|Alice|
| 15| Bran|
+---+-----+

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [23]:
# Combine the data
df_people = df_people_1.union(df_people_2)
df_people.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
|  45|  Alice|
|  15|   Bran|
+----+-------+



In [24]:
df_people_filtered = df_people.filter(df_people['age'] >21)
df_people_filtered

DataFrame[age: bigint, name: string]

In [25]:
df_people_filtered.show()

+---+-----+
|age| name|
+---+-----+
| 30| Andy|
| 45|Alice|
+---+-----+



In [26]:
df_people_filtered.write.parquet("data/people_filtered.parquet")

### Create DF from RDD which is created from a text file

In [27]:
lines = spark.sparkContext.textFile("data/people.txt")  # Read the file

In [28]:
rdd_noschema = lines.map(lambda x: x.split(','))
rdd_noschema.collect()

[['Michael', ' 29'], ['Andy', ' 30'], ['Justin', ' 19']]

In [29]:
df_noschema = spark.createDataFrame(rdd_noschema)
df_noschema

DataFrame[_1: string, _2: string]

In [30]:
df_noschema.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)



In [31]:
df_noschema.show()

+-------+---+
|     _1| _2|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+



#### Provide only field names , no field types

In [32]:
def infer_mapper(line):
    s = line.split(',')
    return Row(name=s[0], age=int(s[1]))

In [33]:
from pyspark import Row
rdd_headers = lines.map(infer_mapper)
rdd_headers.collect()

[Row(age=29, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]

In [34]:
df_headers = spark.createDataFrame(rdd_headers)
df_headers

DataFrame[age: bigint, name: string]

In [35]:
df_headers.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [36]:
df_headers.show()

+---+-------+
|age|   name|
+---+-------+
| 29|Michael|
| 30|   Andy|
| 19| Justin|
+---+-------+



#### Provide the schema with field names and field types

In [37]:
from pyspark.sql.types import *
fields = [StructField("name", StringType(), True), StructField("age", IntegerType(), True)]
schema = StructType(fields)

In [38]:
def schema_mapper(line):
    s = line.split(',')
    return (s[0], int(s[1]))

In [39]:
rdd_schema = lines.map(schema_mapper)

In [40]:
df_schema = spark.createDataFrame(rdd_schema, schema)
df_schema

DataFrame[name: string, age: int]

In [41]:
df_schema.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [42]:
df_schema.show()

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+



In [52]:
df_people.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
|  45|  Alice|
|  15|   Bran|
+----+-------+



#### User Exercises

In [45]:
# List all the people whose names start with A
df1_people = df_people.filter(df_people['name'].startswith('A'))
df1_people.show()

+---+-----+
|age| name|
+---+-----+
| 30| Andy|
| 45|Alice|
+---+-----+



In [63]:
# list all people whose age is null
df2_people = df_people.filter(df_people.age.isNull())
df2_people.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
+----+-------+



In [75]:
# replace null values with 0
df_people.fillna(0,'age').show()

+---+-------+
|age|   name|
+---+-------+
|  0|Michael|
| 30|   Andy|
| 19| Justin|
| 45|  Alice|
| 15|   Bran|
+---+-------+



In [78]:
# replace the age of andy with a value 10
df_people.replace(30,10,'age').show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  10|   Andy|
|  19| Justin|
|  45|  Alice|
|  15|   Bran|
+----+-------+



In [79]:
# replace names justin and bran with j and b respectively
# hint: you can pass a list to replace multiple values
df_people.replace(['Justin','Bran'],['J','B'],'name').show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19|      J|
|  45|  Alice|
|  15|      B|
+----+-------+



In [86]:
# List all people whose ages are between 40 and 20
df_people.filter(df_people.age > 20).filter(df_people.age < 40).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [90]:
df_city.show()

+---------+------+
|     city|counts|
+---------+------+
|Barcelona|     2|
|     Rome|     3|
|    Paris|     4|
|    Vegas|     5|
|Barcelona|     8|
|    Vegas|     9|
|     Rome|     3|
+---------+------+



In [125]:
# list down the sum of all the counts for each city ordered by the sum
df_city.groupBy('city').sum('counts').orderBy('sum(counts)',ascending = False).show() #orderBy('counts').show()

+---------+-----------+
|     city|sum(counts)|
+---------+-----------+
|    Vegas|         14|
|Barcelona|         10|
|     Rome|          6|
|    Paris|          4|
+---------+-----------+



## Alternative solution that doesn't change the name of the column
### Aggregation function you can do much more complex operations

In [127]:
import pyspark.sql.functions as sf
df_sum = df_city.groupBy('city').agg(sf.sum('counts').alias('sum_counts')).show()

+---------+----------+
|     city|sum_counts|
+---------+----------+
|    Vegas|        14|
|    Paris|         4|
|Barcelona|        10|
|     Rome|         6|
+---------+----------+



In [126]:
# list down the count of counts for each
df_city.groupBy('city').count().show()

+---------+-----+
|     city|count|
+---------+-----+
|    Vegas|    2|
|    Paris|    1|
|Barcelona|    2|
|     Rome|    2|
+---------+-----+

