In [None]:
from pyspark import SparkContext
sc = SparkContext()

In [None]:
# Calculate Average of the following data using Spark RDD
rdd = sc.parallelize([('Barcelona',2), ('Rome',3), ('Paris',4), ('Vegas',5), ('Barcelona', 8), ('Vegas',9), ('Rome',3)])
rdd.collect()

In [None]:
rdd_mapped = rdd.map(lambda item: (item[0], [item[1], 1]))
rdd_mapped.collect()

In [None]:
rdd_reduced = rdd_mapped.reduceByKey(lambda a,b: (a[0] + b[0], a[1] + b[1]))
rdd_reduced.collect()

In [None]:
rdd_average = rdd_reduced.map(lambda item: (item[0], item[1][0] / item[1][1]))
rdd_average.collect()

## Dataframes / SQL 

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [None]:
from pyspark import Row
rdd_for_df = rdd.map(lambda item: Row(city=item[0], counts=item[1]))
rdd_for_df.collect()

In [None]:
# Convert the rdd to dataframe
df_city = rdd_for_df.toDF()
df_city.show()

In [None]:
# use the dataframe's native functions
# NOTE: It is allowed to couple multiple dataframe operations together as one line
df_city.groupBy('city').avg('counts').show()

In [None]:
# alternatively, you could use a sql query
df_city.createOrReplaceTempView("CityTable")

In [None]:
avg_query = spark.sql("SELECT city, AVG(counts) FROM CityTable GROUP BY city") # use alias
avg_query.show()

### MULTIPLE DATA SOURCES

In [None]:
df_people_1 = spark.read.json("data/people_1.json")
df_people_1

In [None]:
df_people_1.show()

In [None]:
df_people_1.printSchema()

In [None]:
# create additional data
rdd_people_2 = sc.parallelize([Row(name="Alice", age=45), Row(name="Bran", age=15)])
df_people_2 = rdd_people_2.toDF()

In [None]:
# Write the file as a parquet file
df_people_2.write.parquet("data/people2.parquet")

In [None]:
# optional: read the file back to see if you could load the file back
df_people_2 = spark.read.parquet("data/people2.parquet")
df_people_2

In [None]:
df_people_2.show()

In [None]:
# Combine the data
df_people = df_people_1.union(df_people_2)
df_people.show()

In [None]:
df_people_filtered = df_people.filter(df_people['age'] >21)
df_people_filtered

In [None]:
df_people_filtered.show()

In [None]:
df_people_filtered.write.parquet("data/people_filtered.parquet")

### Create DF from RDD which is created from a text file

In [None]:
lines = spark.sparkContext.textFile("data/people.txt")  # Read the file

In [None]:
rdd_noschema = lines.map(lambda x: x.split(','))
rdd_noschema.collect()

In [None]:
df_noschema = spark.createDataFrame(rdd_noschema)
df_noschema

In [None]:
df_noschema.printSchema()

In [None]:
df_noschema.show()

#### Provide only field names , no field types

In [None]:
def infer_mapper(line):
    s = line.split(',')
    return Row(name=s[0], age=int(s[1]))

In [None]:
from pyspark import Row
rdd_headers = lines.map(infer_mapper)
rdd_headers.collect()

In [None]:
df_headers = spark.createDataFrame(rdd_headers)
df_headers

In [None]:
df_headers.printSchema()

In [None]:
df_headers.show()

#### Provide the schema with field names and field types

In [None]:
from pyspark.sql.types import *
fields = [StructField("name", StringType(), True), StructField("age", IntegerType(), True)]
schema = StructType(fields)

In [None]:
def schema_mapper(line):
    s = line.split(',')
    return (s[0], int(s[1]))

In [None]:
rdd_schema = lines.map(schema_mapper)

In [None]:
df_schema = spark.createDataFrame(rdd_schema, schema)
df_schema

In [None]:
df_schema.printSchema()

In [None]:
df_schema.show()

#### User Exercises

In [None]:
# List all the people whose names start with A


In [None]:
# list all people whose age is null


In [None]:
# replace null values with 0


In [None]:
# replace the age of andy with a value 10


In [None]:
# replace names justin and bran with j and b respectively
# hint: you can pass a list to replace multiple values


In [None]:
# List all people whose ages are between 40 and 20


In [None]:
df_city.show()
# list down the sum of all the counts for each city ordered by the sum


In [None]:
# list down the count of counts for each
