# Spark Notes for Beginners
usefull links:
* https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html
* https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html
* https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_ps.html

## Spark Start-up and Installation

In [None]:
## Spark Start-up

from pyspark import SparkContext
sc = SparkContext()

from pyspark.sql import SparkSession
spark = SparkSession().builder.appName("Spark_notes").getOrCreate()

df = spark.read.csv("/home/jovyan/work/data/iris.csv", inferSchema=True, header=True) #read classes: csv,json, table, parquet


## Spark Basic Functions

In [None]:
##  Spark Apis

df.show() #or z.show(df)

## to see the column infos
df.printSchema()

## to see just column names
df.columns

##describe the dataframe like pandas
df.describe().show()


In [None]:
## Spark Datatypes

from pyspark.sql.types import StringType,StructField,IntegerType,StructType, FloatType, DoubleType, NullType, BooleanType, DateType, TimestampType, BinaryType, ArrayType

## change the data type of a column in a dataframe [age, name]
data_schema = StructType(fields=[StructField("age", IntegerType(), True),StructField("name", StringType(), True)])

new_df = spark.createDataFrame([(1, "John"), (2, "Smith")], data_schema)


In [None]:
## Spark dataframe cell reference


df['column_name'] #it just returns a column type not values inside a column

## to see values inside a column (somethink like pandas.series)
df.select(df.age).show() #or df.select('age').show()

## to see values inside multiple columns(like df['column1','column2'])
df.select(df.age,df.name).show()

## rename a column
df.withColumnRenamed('age','age_new').show()

########################################################################################################################
## to see top values inside a row (somethink like df.head())
df.head(1:x)
#this object can be indexed like a list
df.head(5)[4]  # returns df.iloc[4]


In [None]:
## Something like apply function in pandas

new_df = df.withColumn('new_column', df.age + 1)

## Spark Filter

In [None]:
## Spark Filter

df.filter("age > 20").show() # spark.filter is able to work  like sql where
df.filter(df.age > 20).show() # also it is possible to call it like pandas.filter
# PERSONAL NOTE: instead of using where in sql query use sql filter as above. VERYYYY FASTERRRR

## apis can be used to gether like:
df.filter(df.age > 20).filter(df.name == 'John').show()
df.filter((df.age > 20) & ~(df.name == 'John')).show() #and: &, or: |, not: ~
df.filter(df.age>20).select(df.name).show()

########################################################################################################################
## result of a filter can be saved and used like a list
filter_result = df.filter(df.age > 20).collect() #collect() returns a list of rows

#there are a lot we can do with this list for example:
filter_result[0][0] #returns the first element of the first row
#functions can be used like:  as_dict, count, index
filter_result[0].asDict() #returns the first row as a dictionary 




## Spark Groupby, Agg and Functions

In [4]:
## Spark GroupBy 
df.groupBy('name').count().show() #count, max, mean, min, sum, avg, collect_list, collect_set, collect_as_dict

df.agg({"age": "max"}).show() #agg functions are same as groupby functions


In [None]:
#instead of using agg functions we can use:
from pyspark.sql.functions import countDistinct, stddev, avg, format_number

df.select(avg('age').alias("age_average")).show() #alias is something like 'as' in sql

df.select(format_number( avg('age'),2 )).show() #format_number is used to show float numbers with n decimal places , here n is 2


In [None]:
## Spark DateTime and Timestamp
from pyspark.sql.functions import dayofmonth,dayofweek,dayofyear,month,year,hour,minute,second,weekofyear,date_format
#example:
df.select(dayofmonth(df.date)).show()


In [None]:
## Spark OrderBy
df.orderBy(df['age'].desc()).show() 

## Spark Null Value Handlation

In [None]:
df.na.drop(thresh=2).show() #drop rows with more than 2 null values
df.na.drop(how='any').show() #any: drop rows with any null values,all: drop rows with all null values
df.na.drop(subset=['name']).show() #drop rows with null values in a specific column

########################################################################################################################
df.na.fill(valeu=0,subset=['numerical_columns']).show() #fill null values in nu,eric cols with 0
#example:
mean_age = df.select(avg(df['age'])).collect()[0][0]
df.na.fill(value=mean_age,subset=['age']).show() 



## Save Spark DataFrame

In [None]:
## Save a dataframe in hdfs as a view
df.createOrReplaceTempView("my_table")

view_df = spark.sql("select * from my_table")