#### Section: Basics

In [2]:
# data = sc.textFile("/FileStore/tables/2isn93nr1478690799411/fake.csv")
#
#def parseLine(line):
#    fields = line.split(',')
#    age = int(fields[2])
#    numFriends = int(fields[3])
#    return (age, numFriends)
#
#rdd = data.map(parseLine)
#dataDF = sqlContext.createDataFrame(rdd)

dataDF = sqlContext.read.format("csv").load("/FileStore/tables/2isn93nr1478690799411/fake.csv")

In [3]:
# look at the schema
dataDF.printSchema()

In [4]:
dataDF.show(n=5)

In [5]:
# change age to integer and rename columns appropriately
data = dataDF.selectExpr("C1 as name",
  "cast(C2 as int) as age",
  "cast(C3 as int) as fdcount")

data.show(n=5)              
#data = dataDF.select(dataDF.C1.alias('name'), dataDF.C2.alias('age'), dataDF.C3.alias('fdcount'))

In [6]:
dataDF.count()

In [7]:
ageCountDF = data.groupBy('age').count()
ageCountDF.filter('age < 30').show(n=5)

In [8]:
data.groupBy('age')\
  .sum('fdcount')\
  .where('age = 31')\
  .withColumnRenamed('sum(fdcount)', 'sum_fdcount')\
  .show()

In [9]:
# try a user defined function (UDF)
from pyspark.sql.types import StringType

def ageToCategory(age):
    if age <= 25: return 'Young Adult'
    elif age > 25: return 'Adults'
    else: return 'NA'

udfAgeToCategory=udf(ageToCategory, StringType())

ageCat = data.withColumn('ageCategory', udfAgeToCategory('age'))\
  .show(n=5)
          

#### Section: Joining

In [11]:
# import simple tables

dataLeft = sqlContext.read.format("csv")\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load("/FileStore/tables/gk4vt2uj1478835411334/left_table.csv")
  
dataRight = sqlContext.read.format("csv")\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load("/FileStore/tables/lavmxkux1478835693877/right_table.csv")

dataLeft.show()
dataRight.show()

In [12]:
# inner join (by default)
dataLeft.join(dataRight, 'name')\
  .orderBy('name')\
  .show()

# if merge keys have different names
#dataLeft.join(dataRight, dataLeft("name") === dataRight("name")).orderBy('name').show()

In [13]:
# left join
dataLeft.join(dataRight, 'name', 'left')\
  .orderBy('name')\
  .show()

In [14]:
# right join
dataLeft.join(dataRight, 'name', 'right')\
  .orderBy('name')\
  .show()

In [15]:
# outer
dataLeft.join(dataRight,'name', 'outer')\
  .orderBy('name')\
  .show()

In [16]:
# chain up functions

dataLeft.join(dataRight,'name', 'outer')\
  .groupBy('gender')\
  .avg('age')\
  .show()

#### Section: Plots

In [18]:
# Spark has no visualisation yet, have to prepare data first in spark, then move to locally to plot

plotdata = ageCountDF.toPandas()

In [19]:
import numpy as np
import pandas as pd
from pandas import DataFrame, Series
import matplotlib.pyplot as plt
import matplotlib

# --- plot the data

fig, ax = plt.subplots(figsize=(8, 3.5))

ax.bar(plotdata['age'], plotdata['count'], width = 0.8 , color='#42a5f5', edgecolor='#1976d2', linewidth=2.0)

ax.set_xlabel('Age')
ax.set_ylabel('Count')

ax.set_xlim(min(plotdata['age'])-1,max(plotdata['age'])+1)

# --- title and save

fig.suptitle("Friend count by age")
fig.tight_layout(pad=2)

display(fig)
