#### Find your Spark installation

In [None]:
import findspark
findspark.init('/opt/spark')

#### import Spark SQL and start session

In [None]:
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("my_first_spark_app") \
    .getOrCreate()

#### Load data from file

In [None]:
df = spark.read.csv("energy_data.csv",header=True)

In [None]:
#shows the data (like head())  - this is an action so forces evaluation of pervious commands
df.show()

In [None]:
# Print the schema
df.printSchema()

In [None]:
#add a new column
df = df.withColumn('Wall2', df.Wall_Area + 2)

### Columns are strings, but we need to convert to numbers

In [None]:
#import double type from spark sql
from pyspark.sql.types import FloatType

#convert one column
df = df.withColumn("Wall_Area", df["Wall_Area"].cast(FloatType()))

#convert all columns
for col_name in df.columns:
    df = df.withColumn(col_name, df[col_name].cast(FloatType()))

In [None]:
df.printSchema()

In [None]:
df = df.dropna()

### some built-in functions

In [None]:
#select columns
df2 = df.select(df['Orientation'], df['Roof_area'])

In [None]:
#filter roof areas
df2 = df2.filter(df2['Roof_area'] > 200)

### Aggregating functions

In [None]:
#calculate mean roof area for each orientation
df2 = df2.groupBy("Orientation").agg({'Roof_area': 'mean'})

In [None]:
#only when we called show() - an action - will the previous transformations be executed
df2.show()

### User-defined function

In [None]:
from pyspark.sql.functions import udf

#define a python function
def square_float(x):
    return float(x**2)

#register a spark udf to use python function
square_udf = udf(lambda z: square_float(z), FloatType())

In [None]:
# create new column 'wall_area2 with result of udf
df3 = df.withColumn("Wall_Area2", square_udf(df.Wall_Area))

In [None]:
df3.show()

### SQL

In [None]:
#register spark df as SQL temporary view
df.createOrReplaceTempView("energy")

In [None]:
#execute a SQL query
df_SQL = spark.sql("SELECT * FROM energy WHERE Roof_area > 200")

In [None]:
#show result - only now will the SQL query be executed
df_SQL.show()

### Now we convert the result to a pandas dataframe. This action cause all the previous transformations (which have not yet been executed) to run beofre outputting the new pandas dataframe

In [None]:
df_pd = df2.toPandas()

### Alternately we could output the reuslt to a csv, this is also an action and will cause our transformation to be executed

In [None]:
#this will write each partition of the data into a single csv ina folder called 'energy_data2'
df2.write.csv("energy_data2")

In [None]:
#this will combine all partitioninto one file, and write a single csv in the folder call 'energy_data2_single'
#beware if you are writing a large file - 
# all the data willbe sent to a single executor and might crash your spark sessiom

df2.coalesce(1).write.csv("energy_data2_single")

### END

In [None]:
spark.stop()