# Python

### Read data into Dataframe

In [3]:
textFile_df = spark.read.text(r"LabData/README.md")
textFile_df.head(2)
textFile_df.count()
textFile_df.first()
type(textFile_df)

# RDD ---
# textFile2 = sc.textFile(r"LabData/README.md")
# type(textFile2)

pyspark.sql.dataframe.DataFrame

In [None]:
## Scala Code. creates Dataset
# val textFile = spark.read.textFile(r"LabData/README.md")
# textFile.head(5)
# textFile.count()

#### Dataframe Filter -> Filter lines which contains 'Spark'

In [None]:
linesWithSpark = textFile_df.filter(textFile_df.value.contains('Spark'))
# linesWithSpark = textFile.filter(lambda line: 'Spark' in textFile) --> This wont work
# linesWithSpark2 = textFile2.filter(lambda line: 'Spark' in line)
linesWithSpark.count()

In [None]:
# Scala
# val linesWithSpark = textFile.filter(line => line.contains("Spark"))
# linesWithSpark.count()

#### Dataframe Map --> use select in Dataframes. No Map?

In [None]:
# WordCount
from pyspark.sql.functions import *

# Word list per line
wordList_df = textFile_df.select(split(textFile_df.value, " ").name('wordList'))
wordList_df.collect()
wordList_df.show()


# Word Count per line
wordCount_Perline_df = textFile_df.select(size(split(textFile_df.value, " ")).name('WordCountPLine'))
wordCount_Perline_df.collect()
max_WordCount_Perline_df = wordCount_Perline_df.agg(max(col("WordCountPLine")))
max_WordCount_Perline_df.collect()
max_WordCount_Perline_df.show()


# # Word Count Per word
wordCount_PerWord_df = textFile_df.select(explode(split(textFile_df, " ")).alias("WordCount")).groupBy("WordCount")
wordCount_PerWord_df = textFile_df.select(explode(split(textFile_df.value, "\s+")).name("word")) \
                                .groupBy("word").count().orderBy(desc("count"))
wordCount_PerWord_df.collect()
wordCount_PerWord_df.show(1)

#### Read csv file -> Different ways

In [None]:
# Using read.csv without/with option()
df1 = spark.read.csv(r"LabData/Err.csv")
df2 = spark.read \
    .option("header","true") \
    .csv(r"LabData/Err.csv")

# Using load() with option() and format()
df3 = spark.read \
    .format("csv") \
    .option("header","true") \
    .option("mode", "DROPMALFORMED") \
    .load(r"LabData/Err.csv")

print(df1.count())
print(df2.count())
print(df3.count())
print(type(df1))
print(type(df2))
print(type(df3))

## Data Manipulations --> Sample file 'Samp_MKT.csv'

In [None]:
# Read input csv file
mkt_df = spark.read \
    .format("csv") \
    .option("header","true") \
    .option("mode", "DROPMALFORMED") \
    .option("inferSchema", "true") \
    .load(r"LabData/Samp_MKT.csv")
mkt_df.show(5)

In [None]:
# print schema of dataframe
mkt_df.printSchema()
# mkt_df.describe()

In [5]:
# Column List and No of columns
mkt_df.columns
len(mkt_df.columns)

15

In [None]:
# Show Statistics of the dataframe - shows for numeric fields, otherwise null
# mkt_df.describe().show()
mkt_df.select('AMOUNT').describe().show()

# # Non non numeric fields, it shows count, min, max. No mean or std
mkt_df.select('EFFORT_KEY').describe().show()

In [None]:
# Select fields - one or multiple fields
mkt_df.select('EFFORT_KEY').show(5)
mkt_df.select('EFFORT_KEY', 'STATE').show(5)
mkt_df.select(['EFFORT_KEY', 'STATE']).show(5)

In [17]:
# distinct --> for a column, for composite columns, for entire row
# mkt_df.select('EFFORT-KEY').distinct().count()
# Can also be used to dropduplicates. To drop dups you can also use dropduplicates() instead of distinct(). Both works

mkt_df.select(['STATE','ZIP']).distinct().count()
mkt_df.select(['STATE','ZIP']).dropDuplicates().count()
mkt_df.distinct().count()

1001

In [None]:
# Crosswise Frequency | Choose atleast one categorical field if not both. Takes two arguments only. 
# Shows frequency of each value of 2nd argument column for 1st argument col in a matrix/Dataframe

mkt_df.crosstab('STATE', 'AMOUNT').show()
mkt_df.crosstab('FULFILL_STATUS', 'STATE').show()

In [None]:
# Work with null values | dropna() --> drop nulls | fillna() --> fill nulls with specified value(s)

mkt_nonull_df = mkt_df.dropna()
mkt_fillnull_df = mkt_df.fillna(-1)
mkt_nonull_df.show()
mkt_fillnull_df.show()

In [None]:
# filter rows based on values

mkt_df.filter(mkt_df.AMOUNT > 500).select(['KEYLINE', 'AMOUNT']).show(5)
mkt_df.filter((mkt_df.STATE != 'IL') & (mkt_df.AMOUNT > 300)).select('KEYLINE', 'STATE', 'AMOUNT').show(5)
mkt_df.filter(mkt_df.AMOUNT > 500).show(5)

In [37]:
import pyspark.sql.functions as sf
# GroupBY and aggregate functions

# mkt_df.groupBy('EFFORT_KEY').agg({'AMOUNT': 'mean'}).show(3)
# mkt_df.groupBy(mkt_df.EFFORT_KEY).agg({'AMOUNT': 'std'}).show(3)
# mkt_df.groupBy('EFFORT_KEY').agg({'AMOUNT': 'max'}).show(3)
# mkt_df.groupBy('EFFORT_KEY').agg({'AMOUNT': 'min'}).show(3)
# mkt_df.groupBy('EFFORT_KEY').count().collect()

# mkt_df.groupBy('EFFORT_KEY').agg({'AMOUNT': 'sum'}).show()
# mkt_df.groupBy('EFFORT_KEY').agg({'AMOUNT': 'sum'}).agg({'sum(AMOUNT)': 'max'}).show()


df1 = mkt_df.groupBy('EFFORT_KEY').agg({'AMOUNT': 'sum'})

## --> Renaming the Agg of SUM(AMOUNT) to an alias 'Sum_AMT' | Two methods shown below

df1 = mkt_df.groupBy('EFFORT_KEY').agg(sf.sum('AMOUNT').alias('Sum_AMT'))  # Method-1
df1 = mkt_df.groupBy('EFFORT_KEY').agg({'AMOUNT': 'sum'}).withColumnRenamed("SUM(AMOUNT)", "Sum_Amt") # Method-2
df1.take(2)


# df2 = df1.sort('sum(AMOUNT)', ascending=False).select('EFFORT_KEY')
# df2 = df1.sort('sum(AMOUNT)', ascending=False)
# df2 = df1.sort('sum(AMOUNT)', ascending=False).select


# list1 = df2.take(2)
# print(list1)
# for items in list1:
#     mydict = items.asDict()
#     print('Value for EFK:', mydict['EFFORT_KEY'], 'is ->', mydict['sum(AMOUNT)'])

[Row(EFFORT_KEY='TDPK', Sum_AMT=750), Row(EFFORT_KEY='TC5Q', Sum_AMT=11600)]

In [None]:
# OrderBy
mkt_df.orderBy(mkt_df.CITY.desc()).select('KEYLINE', 'EFFORT_KEY', 'AMOUNT', 'CITY').show(5) 

# Note argument of orderBy does not properly work if the field contains '-'. Below doesn't work
# You will get an error  ==> 'DataFrame' object has no attribute 'EFFORT'. So use '_' in col name instead of '-' as below
# #-> mkt_df.orderBy(mkt_df.EFFORT-KEY.desc()).select('KEYLINE', 'EFFORT-KEY', 'AMOUNT', 'CITY').show(5) 
mkt_df.orderBy(mkt_df.EFFORT_KEY.desc()).select('KEYLINE', 'EFFORT_KEY', 'AMOUNT', 'CITY').show(5) 

In [None]:
# groupBy with OrderBY

mkt_df.groupBy('EFFORT_KEY').agg({'AMOUNT': 'mean'}) \
        .orderBy(mkt_df.EFFORT_KEY.desc()).select('EFFORT_KEY', 'avg(AMOUNT)').show(5)

mkt_df.groupBy('EFFORT_KEY').agg({'AMOUNT': 'mean'}) \
        .orderBy('avg(AMOUNT)').select('EFFORT_KEY', 'avg(AMOUNT)').show(5)

## Apply SQL Queries on Dataframes

In [None]:
mkt_df.registerTempTable('MKT_TABLE')

In [None]:
# sqlContext.sql("SELECT KEYLINE, AMOUNT, STATE FROM MKT_TABLE WHERE NOT STATE = 'IL'").show(10)
sqlContext.sql("SELECT KEYLINE, AMOUNT, EFFORT_KEY, ZIP+4, STATE FROM MKT_TABLE WHERE NOT STATE = 'IL'").show(10)

In [None]:
sqlContext.sql("SELECT EFFORT_KEY, AVG(AMOUNT) as Avg FROM MKT_TABLE GROUP BY EFFORT_KEY ORDER BY Avg Desc").show(5)

# Spark Structured Streaming

### Windowing on static files

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# inputPath = "/databricks-datasets/structured-streaming/events/file-0.json"
inputPath = r"LabData/logs"
logSchema = StructType([StructField("time", TimestampType(), True), StructField("action", StringType())])

log_df = spark.read \
            .format("json") \
            .schema(logSchema) \
            .load(inputPath)
log_df.count()

In [None]:
countLog_df = log_df.groupBy(window(log_df.time, "30 minutes"), log_df.action).count()
countLog_df.show()

### Streaming codes

In [None]:
streamLog_df = spark.readStream \
                    .format("json") \
                    .schema(logSchema) \
                    .option("maxFilesPerTrigger", 1) \
                    .load(inputPath)
type(streamLog_df)

In [None]:
streamingCountsDF = streamLog_df \
                .groupBy(streamLog_df.action, window(streamLog_df.time, "1 hour")) \
                .count()
streamingCountsDF.isStreaming

In [None]:
# read the stream into stream Dataframe
logStream_df = spark.readStream \
                    .format("json") \
                    .schema(logSchema) \
                    .option("maxFilesPerTrigger", 1) \
                    .load(inputPath)

# group the Dataframe
countStream_df = logStream_df.groupBy(window(logStream_df.time, "30 minutes"), logStream_df.action) \
                    .count()

print('Am I streaming? -> ', countStream_df.isStreaming)

In [None]:
queryy = countStream_df.writeStream \
                    .format("console") \
                    .queryName("mylogs2") \
                    .outputMode("complete") \
                    .start()
query.awaitTermination()

In [None]:
query1.explain(extended = True)

In [None]:
type(spark.streams.active)

In [None]:
spark.sql("select * from mylogs2").show()  

In [None]:
# File file sinks Only Append mode is allowed. Append mode cannot work with Aggregates
query_csv = logStream_df.writeStream \
                    .format("csv") \
                    .option("checkpointLocation", r"LabData/chk_pt") \
                    .option("path", r"LabData/out") \
                    .queryName("mylogs_csv") \
                    .outputMode("append") \
                    .start()

In [None]:
query_csv.stop()