In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Basics').getOrCreate()
df = spark.read.json('/FileStore/tables/people.json')

In [3]:
df.show()

In [4]:
 df.printSchema()

In [5]:
df.columns

In [6]:
df.describe().show()

In [7]:
# import type tools
from pyspark.sql.types import (StructField, StringType, IntegerType, StructType)

In [8]:
# Create a list of structure field, with 3 parameters: the column name('age'), the class instances(IntegerType) 
# and some sort of nullable(whether or not the field can be NULL, if not True, will show error for missing values)

# Change type of 'age' from long to integer
# Change type of 'name' to string
data_schema = [StructField('age', IntegerType(), True),
               StructField('name', StringType(), True)]

In [9]:
# Pass in data_schema into StructType
final_struc = StructType(fields = data_schema)

In [10]:
# Read in the json file but clarify the schema to be as the final_struc
df = spark.read.json('/FileStore/tables/people.json', schema=final_struc)

In [11]:
df.printSchema()

In [12]:
# Get a column object using df['column']
type(df['age'])

In [13]:
# Return a dataframe containing a single column
df.select('age').show()

In [14]:
# Check row objects
df.head(2)[0]

In [15]:
type(df.head(2)[0])

In [16]:
# Create new columns
# Pass the name of the new column and the new column
df.withColumn('double_age', df['age']*2).show()

In [17]:
# Rename a column
df.withColumnRenamed('age', 'my_new_age').show()

In [18]:
# Register the dataframe as a SQL temporary view
df.createOrReplaceTempView('people')

In [19]:
results = spark.sql('SELECT * FROM people')
results.show()

In [20]:
new_results = spark.sql('SELECT * FROM people WHERE age=30')
new_results.show()

In [21]:
spark = SparkSession.builder.appName('ops').getOrCreate()
df = spark.read.csv('/FileStore/tables/appl_stock.csv', inferSchema=True, header=True)

In [22]:
df.printSchema()

In [23]:
df.show()

In [24]:
df.head(3)

In [25]:
# Use df.filter with sql syntac
df.filter('Close < 500').select(['Open', 'Close']).show()

In [26]:
df.filter(df['close'] < 500).select('Volume').show()

In [27]:
df.filter((df['Close'] < 200) & ~(df['open'] > 200)).show()

In [28]:
# Collect the result as a row object
result = df.filter(df['Low'] == 197.16).collect()
result

In [29]:
row = result[0]

In [30]:
# Convert row to dict
row.asDict()

In [31]:
# Grab a key
row.asDict()['Volume']

In [32]:
spark = SparkSession.builder.appName('aggs').getOrCreate()
df = spark.read.csv('/FileStore/tables/sales_info.csv', inferSchema=True, header=True)

In [33]:
df.show()

In [34]:
df.printSchema()

In [35]:
df.groupBy('Company')

In [36]:
df.groupBy('Company').mean()

In [37]:
df.groupBy('Company').mean().show()

In [38]:
df.groupBy('Company').count().show()

In [39]:
df.agg({'Sales':'sum'}).show()

In [40]:
group_data = df.groupBy('Company')

In [41]:
# A more generalized format but the same as .groupby.agg
group_data.agg({'Sales':'max'}).show()

In [42]:
# Import function from spark
from pyspark.sql.functions import countDistinct, avg, stddev

In [43]:
df.select(countDistinct('Sales').alias('Count sales')).show()

In [44]:
df.select(stddev('Sales')).show()

In [45]:
from pyspark.sql.functions import format_number

sales_std = df.select(stddev('Sales').alias('std'))
# decimals
sales_std.select(format_number('std', 2)).show()

In [46]:
sales_std.select(format_number('std', 2).alias('std')).show()

In [47]:
df.orderBy('Sales').show()

In [48]:
df.orderBy(df['Sales'].desc()).show()

In [49]:
spark = SparkSession.builder.appName('dates').getOrCreate()
df = spark.read.csv('/FileStore/tables/appl_stock.csv', inferSchema=True, header=True)

In [50]:
df.show()

In [51]:
df.select(['Date', 'Open']).show()

In [52]:
from pyspark.sql.functions import dayofmonth, hour, dayofyear, month, year, weekofyear, format_number, date_format

In [53]:
df.select(dayofmonth(df['Date'])).show()

In [54]:
df.select(hour(df['Date'])).show()

In [55]:
df.select(month(df['Date'])).show()

In [56]:
# Create a new Year column
newdf = df.withColumn("Year", year(df['Date']))

In [57]:
newdf.groupBy('Year').mean().show()

In [58]:
result = newdf.groupBy('Year').mean().select(['Year', 'avg(Close)'])

In [59]:
result.withColumnRenamed('avg(Close)', 'Average Closing Price').show()

In [60]:
new = result.withColumnRenamed('avg(Close)', 'Average Closing Price')
new.select(['Year', format_number('Average Closing Price', 2).alias('Average Close')]).show()