In [0]:
import pyspark

In [0]:
# https://towardsdatascience.com/the-most-complete-guide-to-pyspark-dataframes-2702c343b2e8
# https://confizlimited.udemy.com/course/spark-and-python-for-big-data-with-pyspark/learn/lecture/6688224#overview

In [0]:
sdf = sqlContext.sql('select * from people_table')

In [0]:
# in local setup you'll have to make a SparkSession object as follows:
# However in Databricks, a SparkSession object names, spark is already available

"""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Basics").getOrCreate() #spark session is the driver, each spark application has only one spark session
"""

# to create a spark project from notebook:
"""

if __name__ == '__main__':
  spark = SparkSession.builder.master('local[3]').appName('something').getOrCreate() # local[3] means, use local multi-threaded JVM with 3 threads
  logger = Log4j(spark)
  
  
  ......after doing your work, you close the driver as:
  spark.stop()
"""


In [0]:
# read write collect and show are actions, all other things are transformations
# no transformations are performed unless you call an action

"""
There are two types of transformations:
_Narrow dependency (the result is not affected by partition e.g. where age > 40)
_Wide dependency (the result is affected by partition e.g. group by country ) this groupby will result in an internal shuffle-sort repartitioning, you need to do config to make sure that shuffle-sort happens as per your plan
use: spark.sql.shuffle.partitions


repartition your data by using sdf.repartition(number_of_partitions) # remember repartition is a transformation i.e. nothing will happen unless you call an action
"""

In [0]:
"""
each spark action causes a job (infering file schema is an action as well)
each job has multiple stages, stages are divided by repartition
each stage has multiple tasks

e.g. 

repartitioned_df = df.repartition(number_of_partitions)
new_df = df.select('column1', 'column2', 'column3').filter('column1>40').groupBy('column3').collect() 

All of this code will cause a single job in spark

"""

In [0]:
"""
There are two way to execute program on spark:
  1) spark interactive client: spark-shell and notebook
  2) submit job: spark-submit, databricks notebook, rest API
  
_____

batch job vs stream job

______

spark follows a master-slave architecture
cluster has its own master and slave. While inside cluster, driver and executors act as master and slaves as well
when you submit a job, the spark engine will request the cluster manager to allocate a container in which the driver will run. then driver asks the cluster manager for more containers where executors will run

_____

There are different types of CLUSTER MANAGERS which can work with spark
>local (this is just a simulated cluster manager which runs locally)
>YARN
>Kubernetes
>Mesos
>Standalone


______

local[3] means three threads on local cluster. one will be used for driver and other two for executors

______

spark runs in EXECUTION MODES:
1) client : driver on client machine and executors on cluster
2) cluster : driver and executors on cluster

________

EXECUTION TOOLS:
1) IDE, Notebook
2) Job submit


___________

local+client mode+notebook/shell

yarn+client mode+notebook/shell

yarn+cluster mode+submit
"""


In [0]:
sdf.show()

In [0]:
# how to define schema manually

from pyspark.sql.types import (StructField, StructType, IntegerType, StringType)

data_schema = [StructField('age', IntegerType(), True),
              StructField('name', StringType(), True)]

final_struct = StructType(fields=data_schema)

# now read the json file with this schema

sdf2 = spark.read.json('dbfs:/FileStore/tables/people_1_.json', schema=final_struct)
sdf2.printSchema()

In [0]:
%fs

ls dbfs:/FileStore/shared_uploads/mariashaukat352@gmail.com/

path,name,size
dbfs:/FileStore/shared_uploads/mariashaukat352@gmail.com/appl_stock_1_-1.csv,appl_stock_1_-1.csv,143130
dbfs:/FileStore/shared_uploads/mariashaukat352@gmail.com/appl_stock_1_.csv,appl_stock_1_.csv,143130
dbfs:/FileStore/shared_uploads/mariashaukat352@gmail.com/features.csv,features.csv,592289
dbfs:/FileStore/shared_uploads/mariashaukat352@gmail.com/people_1_.json,people_1_.json,73
dbfs:/FileStore/shared_uploads/mariashaukat352@gmail.com/train.csv,train.csv,12842546


In [0]:
sdf['age'] # this will return a column
sdf.select('age') # this will return a dataframe
sdf.select(['age','name'])

In [0]:
sdf.head(2) # list of first two rows as row objects

In [0]:
# adding new column (temporarily)

sdf.withColumn('double_age', sdf['age']*2).show()

In [0]:
sdf.createOrReplaceTempView('people_table') # making a queriable table from dataframe
# now we can query it using sql

In [0]:
%sql

select  * from people_table

age,name
,Michael
30.0,Andy
19.0,Justin


In [0]:
spark.sql('select * from people_table').show()

In [0]:
# from pyspark.sql import SparkSession # to be done in local setup only
# spark = SparkSession.builder.appName('ops').getOrCreate()  # to be done in local setup only

sdf_apple = spark.read.csv('dbfs:/FileStore/shared_uploads/mariashaukat352@gmail.com/appl_stock_1_.csv', inferSchema=True, header=True)
sdf_apple.show()

In [0]:
sdf_apple.filter('close < 500').show() # using filter with sql
sdf_apple.filter(sdf_apple['close'] < 500).show() # doing without sql
sdf_apple.filter( (sdf_apple['close'] < 500) & (sdf_apple['open'] > 200)) .show() # using & and |


In [0]:
results = sdf_apple.filter(sdf_apple['close'] > 700).collect()
results

In [0]:
results[0].asDict()['Close']

In [0]:
# transformations

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

def date_fmt(df, fmt, fld):
  return df.withColumn(fld, to_date(df[fld], fmt))


In [0]:
# instead of loading data from a file here, we create test dataframe


schema = StructType([StructField('name', StringType()), StructField('date', StringType())])

rows = [Row('Maria', '26/09/1995'), Row('Ahsan', '09/09/1985')]
my_rdd = spark.sparkContext.parallelize(rows,2) # paralellization
my_sdf = spark.createDataFrame(my_rdd, schema)
my_sdf.printSchema()

In [0]:

my_sdf_transformed = date_fmt(my_sdf, 'd/m/y', 'date')
my_sdf_transformed.printSchema()
my_sdf_transformed.show()

In [0]:
%fs

ls databricks-datasets/

path,name,size
dbfs:/databricks-datasets/,databricks-datasets/,0
dbfs:/databricks-datasets/COVID/,COVID/,0
dbfs:/databricks-datasets/README.md,README.md,976
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359
dbfs:/databricks-datasets/adult/,adult/,0
dbfs:/databricks-datasets/airlines/,airlines/,0
dbfs:/databricks-datasets/amazon/,amazon/,0
dbfs:/databricks-datasets/asa/,asa/,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0


In [0]:
%fs
ls dbfs:/databricks-datasets/airlines/part-00000

path,name,size
dbfs:/databricks-datasets/airlines/part-00000,part-00000,67108879


In [0]:
"""
data_schema = Structtype([StructField('Year', IntegerType(), True),
                          StructField('Month', IntegerType(), True),
                          StructField('DayOfMonth', IntegerType(), True),
                          StructField('DayOfWeek', IntegerType(), True),
                          StructField('DepTime', IntegerType(), True),
                          StructField('CRSDepTime', IntegerType(), True),
                          StructField('ArrTime', IntegerType(), True),
                          StructField('CRSArrTime', IntegerType(), True),
                          StructField('UniqueCarrier', StringType(), True),
                          StructField('FlightNum', IntegerType(), True),
                          StructField('TailNum', IntegerType(), True),
                          StructField('ActualElapsedTime', IntegerType(), True),
                          StructField('CRSElapsedTime', IntegerType(), True),
                          StructField('AirTime', IntegerType(), True),
                          StructField('ArrDelay', IntegerType(), True),
                          StructField('DepDelay', IntegerType(), True),
                          StructField('Origin', StringType(), True),
                          StructField('Dest', StringType(), True),
                          StructField('Distance', IntegerType(), True),
                          StructField('TaxiIn', StringType(), True),
                          StructField('TaxiOut', StringType(), True)]),
"""

air_data = spark.read.csv('dbfs:/databricks-datasets/airlines/part-00000', inferSchema=True, header=True)
air_data.printSchema()
air_data.show(10)

In [0]:
from pyspark.sql.functions import *
# two ways to select a column, column string and column object
air_data.select('Year').show(10)  
air_data.select(air_data.Year)


In [0]:
air_data.select('IsArrDelayed', 'Cancelled') # you can do this with an sql select statement as well
# or you can put an sql expression within select() using exp

air_data.select("Year","Month", "DayOfWeek").show(10)

air_data.select( expr("to_date(concat(Year,'/',Month,'/',DayOfWeek),'y/m/d') as FlightDate")).show(10)

In [0]:
from pyspark.sql import functions as f

air_data.select(f.count("*").alias('count'), 
               f.sum('Distance').alias('sum'),
               f.countDistinct('Dest').alias('distinct_dest')).show(10)

#all this can be done with sql Expression as well

air_data.selectExpr('count(*) as count', 
                   'sum(distance) as sum').show()



In [0]:
help(f.count)

In [0]:
air_data.groupBy('Dest').count().show(10)
# lets change MSY into ABC and GEG into XYZ using User Defined Functions

In [0]:


def change_dest(dest):
  msy = 'MSY'
  geg = 'GEG'
  if dest == msy:
    return 'ABC'
  if dest == geg:
    return 'XYZ'
  else:
    return dest
  
  
change_dest_udf = udf(change_dest, StringType()) #specify function name and its return type to register it to the driver as a UDF

# UDF with column expression
# we need to register the function to the driver
air_data_temp = air_data.withColumn('Dest', change_dest_udf('Dest')) # with column only changes the data in one column of the SDF, you need column name and the change expression/function
air_data_temp.filter(air_data_temp['Dest']=='MSY').show(10)



# UDF with string expression 
# we will need create UDF function as sql function to the catalog as well as driver
spark.udf.register('change_dest_udf', change_dest, StringType()) # change_dest+StringType() is the signature of the function
[print(f) for f in spark.catalog.listFunctions() if 'change_dest_udf' in f.name] # to check if the function has been registered to the catalog
# how to use a udf function as string expression? Here:
air_data_temp2 = air_data.withColumn('Dest', expr("change_dest_udf(Dest)"))
air_data_temp2.filter(air_data_temp2['Dest']=='MSY').show(10) # check if the changed strings exist in the dataframe

In [0]:
#there are three types-> simple aggregations,grouping aggregation, window aggregation

"""

# boiler plate code
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from lib.logger import Log4j

if __name__=='__main__':
  spark = SparkSession\
  .builder\
  .appName('Aggregation_demo')\
  .master('local[2]').GetOrCreate()

logger = Log4j(spark)

"""







  

In [0]:
#creating DataFrame quickly for testing

data_list = [
  ('Maria', 26, 9, 1995), 
  ('Ahsan', 9, 9, 85), 
  ('Bubu', 1, 1, 19)]

raw_sdf = spark.createDataFrame(data_list).toDF('name', 'day', 'month', 'year')

raw_sdf.printSchema()

from pyspark.sql.functions import *
raw_sdf = raw_sdf.withColumn("id", monotonically_increasing_id())
raw_sdf.show()

# monotonically increasing ID, these are unique accorss partitions, but not necessarily 



In [0]:
# convert year from 2 digit to 4 digit



raw_sdf1 = raw_sdf.withColumn('year', expr("""
case
when year<20 then year+2000
when year<100 then year+1900 
else year
end"""))

# this will also work is year is string instead of int but in string you'll need to do casting:
# incline casting-> cast(year as int)
# schmema casting-> you can cast in withColumn function as 
#raw_sdf = raw_sdf.withColumn('year', expr("""
#case
#when year<20 then year+2000
#when year<100 then year+1900 
#else year
#end""").cast(IntegerType()))


raw_sdf.show()


# case statement in pyspark

raw_sdf2 = raw_sdf.withColumn('year', \
                              when(col('year')<20, col('year')+2000) \
                             .when(col('year')<100, col('year')+1900) \
                             .otherwise(col('year')))

raw_sdf2.show()


In [0]:
from pyspark.sql import Row

# make a function that works on rdd rows:
def make_new_row(row):
  row_dict = row.asDict() #convert rdd row to a dictionary
  row_dict['Average'] = (row_dict['High']+row_dict['Low'])/2 # here you can perform a complex operation as well
  # new key:value pair has been added to the row now
  newrow = Row(**row_dict) #convert back to rdd row
  return newrow # return updated rdd row


apple_rdd = sdf_apple.rdd # convert spark dataframe to rdd
new_apple_rdd = apple_rdd.map(lambda row: make_new_row(row)) # map by default works row by row
new_apple_sdf = sqlContext.createDataFrame(new_apple_rdd) # new dataframe is created from spark context
new_apple_sdf.show()

In [0]:
air_data.show(10)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

temp_air_df = air_data.select(['Year','Month','Origin','Dest','Distance', 'ArrDelay'])
temp_air_df = temp_df.withColumn('ArrDelay', temp_air_df.ArrDelay.cast(IntegerType()))# randomly selected rows
window_spec = Window().partitionBy(['Dest', 'Origin', 'Month']).orderBy(F.desc('ArrDelay'))
temp_df.withColumn('rank', F.rank().over(window_spec)).show(100)

In [0]:
window_spec2 = Window().orderBy('Date') # can also use this to partition
sdf_apple.withColumn('Volume_7lag', F.lag('Volume',7).over(window_spec2)).show() # a window() function is necessary for lag????

In [0]:
window_spec3 = Window().orderBy('Date').rowsBetween(-6,0) # we can use (Window.unboundedPreceding, Window.currentRow) instead of (-6,0) to use all previous rows
sdf_apple = sdf_apple.withColumnRenamed('Adj Close', 'Adj_close')
sdf_apple = sdf_apple.withColumn('Adj_close',sdf_apple.Adj_close.cast(IntegerType()))# converting Adj Close column to int for ease
sdf_apple = sdf_apple.withColumn('Adj_close_rolling', F.sum('Adj_close').over(window_spec3))
sdf_apple.show()

In [0]:
# count the number of cancelled flights to each destination on a specific date

temp_air_df = air_data.select('Year', 'Month', 'DayOfMonth', 'FlightNum', 'Cancelled', 'Origin', 'Dest')
#emp_air_df.show()

In [0]:

air_df_pivot = temp_air_df.groupBy('Year','Month','DayOfMonth').pivot('Dest')\
            .agg(F.sum('Cancelled').alias('Cancelled'), (F.count('Cancelled')-F.sum('Cancelled')).alias('NotCancelled')) # one way to do it, but it works only when Cancelled is either 0 or 1

# what is a better way to do this?
air_df_pivot.show(1)

In [0]:
# there are two count functions available:
# pyspark.sql.Functions.count is count(col), it operates on a column in dataframe
# df.count() operates on entire dataframe and returns the number of rows in the data frame

In [0]:

# also sdf['col_name'] gives a column while sdf.select('col_name') gives a dataframe

temp_air_df.tail(1) # selecting top or bottom rows, using head or tail

In [0]:
# just verifying the results
temp_air_df.filter( (temp_air_df.Dest == 'ABE') & (temp_air_df.Year == 1987) & (temp_air_df.Month == 10) & (temp_air_df.DayOfMonth ==3) &  (temp_air_df.Cancelled ==0)).count()
# how to select first row from the dataframe?


In [0]:
help(F.getRows)