In [1]:
from pyspark.sql import SparkSession
import pyspark

In [2]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [3]:
df = sqlContext.sql("SELECT * FROM people_json")

In [4]:
df.show()

In [5]:
df.printSchema()

In [6]:
df.columns

In [7]:
df.describe().show()

In [8]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [9]:
# setting up the data schema that you are expecting
data_schema = [StructField('age', IntegerType(), True),
              StructField('name', StringType(), True)]

In [10]:
final_struc = StructType(fields = data_schema)

In [11]:
df = spark.read.json('FileStore/tables/people.json', schema = final_struc)

In [12]:
df.printSchema()

In [13]:
# column object in spark
df['age']

In [14]:
type(df['age'])

In [15]:
df.select('age').show()

In [16]:
# DataFrame object in spark
type(df.select('age'))

In [17]:
df.head(2)

In [18]:
# row object in spark
df.head(2)[0]

In [19]:
type(df.head(2)[0])

In [20]:
# select multiple columns in spark
df.select(['age', 'name']).show()

In [21]:
# add or replace a column  ===> not an inplace operation
df.withColumn('double_age', df['age']*2).show()

In [22]:
df.show()

In [23]:
# rename column name, also not inplace!!
df.withColumnRenamed('age', 'my_new_age').show()

In [24]:
# create temporary view with SQL
df.createOrReplaceTempView('people')

In [25]:
results = spark.sql("SELECT * from people")

In [26]:
results.show()

In [27]:
# leverage the power of spark sql!
new_results = spark.sql("SELECT * FROM people WHERE age = 30")

In [28]:
new_results.show()

In [29]:
# spark DataFrame Basic Operations

In [30]:
spark = SparkSession.builder.appName('ops').getOrCreate()

In [31]:
df = spark.read.csv('FileStore/tables/appl_stock.csv', inferSchema = True, header = True)

In [32]:
df.printSchema()

In [33]:
df.show()

In [34]:
df.head(3)[0]

In [35]:
# use filter with SQL syntax
df.filter("Close < 500").show()

In [36]:
df.filter("Close < 500").select(['Open', 'Close']).show()

In [37]:
# use spark's own language instead of SQL syntax:
df.filter(df['Close'] < 500).select('Volume').show()

In [38]:
# if have multiple filtering conditions, use & between them and use () to wrap each condition!
df.filter((df['Close'] < 200) & (df['Open'] > 200)).show()

In [39]:
df.filter(df['Low'] == 197.16).show()

In [40]:
# use collect() to save your filter result, and we can use the result in the future
result = df.filter(df['Low'] == 197.16).collect()

In [41]:
row = result[0]

In [42]:
# turn to dictionary
row.asDict()

In [43]:
# get one attribute value in the row
row.asDict()['Volume']

In [44]:
# groupby and aggregate functions with spark dataframes
spark = SparkSession.builder.appName('aggs').getOrCreate()
df = spark.read.csv('FileStore/tables/sales_info.csv', inferSchema = True, header = True)

In [45]:
df.show()

In [46]:
df.printSchema()

In [47]:
df.groupBy("Company")

In [48]:
df.groupBy('Company').mean().show()

In [49]:
df.groupBy('Company').count().show()

In [50]:
df.groupBy('Company').sum().show()
# other methods: min(), max(), ...

In [51]:
# to calculate sum/avg over all records, i.e., no need to groupBy:
df.agg({'Sales': 'max'}).show()
# in the dictionary{}: the key = column we are operating on, and the value = the operation that we will do. could be 'sum', 'min', 'max', ...


In [52]:
# similar syntax using groupBy:
group_data = df.groupBy("Company")
group_data.agg({'Sales': 'max'}).show()

In [53]:
# import functions from spark
from pyspark.sql.functions import countDistinct, avg, stddev

In [54]:
df.select(countDistinct('Sales')).show()

In [55]:
df.select(avg('Sales')).show()

In [56]:
# use alias in the select query
df.select(avg('Sales').alias('Average Sales')).show()

In [57]:
df.select(stddev('Sales')).show()

In [58]:
# format the output number
from pyspark.sql.functions import format_number
sales_std = df.select(stddev('Sales').alias('std'))
sales_std.show()
df.select(format_number(stddev('Sales'), 2).alias('std')).show()

In [59]:
sales_std.select(format_number('std', 2).alias('std')).show()

In [60]:
# combine the above in one line code:
df.select(format_number(stddev('Sales'), 2).alias('std')).show()

In [61]:
# orderBy (just call the string column name directly for ascending order)
df.orderBy('Sales').show()

In [62]:
# descending order: (do not call the column name directly if want to sort in descending order, but need to pass the column object itself)
df.orderBy(df['Sales'].desc()).show()

In [63]:
# Spark DataFrames Missing Data
spark = SparkSession.builder.appName('miss').getOrCreate()

In [64]:
df = spark.read.csv('FileStore/tables/ContainsNull.csv', inferSchema = True, header = True)

In [65]:
df.show()

In [66]:
# drop missing data
# drop any row with missing data
df.na.drop().show()

In [67]:
# drop a row if it contains at least 2 missing data (using thresh)
df.na.drop(thresh=2).show()

In [68]:
# drop a row if any cell in the row is null
df.na.drop(how='any').show()

In [69]:
# drop a row if all cells in the row are null
df.na.drop(how='all').show()

In [70]:
# drop the row if 'Sales' cell is null
df.na.drop(subset = ['Sales']).show()

In [71]:
# fill the missing data
df.printSchema()

In [72]:
# fill the string values (spark will fill the missing string values automatically)
df.na.fill('FILL VALUE').show()

In [73]:
# fill the numeric values
df.na.fill(0).show()

In [74]:
# target a column to fill
df.na.fill('No Name', subset = ['Name']).show()

In [75]:
from pyspark.sql.functions import mean
mean_val = df.select(mean(df['Sales'])).collect()

In [76]:
mean_val[0]

In [77]:
mean_sales = mean_val[0][0]

In [78]:
# fill in the missing sales values with mean sales values
df.na.fill(mean_sales, subset = ['Sales']).show()

In [79]:
# fill in with the mean value with one-line code (does the same job)
df.na.fill(df.select(mean(df['Sales'])).collect()[0][0], subset = ['Sales']).show()

In [80]:
# Dates and Timestamps
spark = SparkSession.builder.appName('dates').getOrCreate()
df = spark.read.csv('FileStore/tables/appl_stock.csv', inferSchema = True, header = True)

In [81]:
df.head(1)

In [82]:
df.select(['Date','Open']).show()

In [83]:
# extract useful information from daytime and timestamp data
from pyspark.sql.functions import (dayofmonth, hour, 
                                   dayofyear, month, year,
                                   weekofyear, format_number,
                                   date_format)

In [84]:
df.select(dayofmonth(df['Date'])).show()

In [85]:
df.select(hour(df['Date'])).show()

In [86]:
df.select(month(df['Date'])).show()

In [87]:
# calculate the average closing price by year
df.select(year(df['Date'])).show()

In [88]:
# create a Year column in the df and save the df
newdf = df.withColumn("Year", year(df['Date']))

In [89]:
# get the mean close price for each year
result = newdf.groupBy("Year").mean("Close")

In [90]:
result.show()

In [91]:
# format and order the result
result.select(['Year', format_number('avg(Close)',2).alias('Average Closing Price')]).show()