# Spark Notes

## Installing Spark 
- Java is needed
- pyspark is needed
- create a session to use spark

In [9]:
!pip install pyspark py4j

[33mDEPRECATION: Loading egg at /Users/emilyng/Library/Python/3.11/lib/python/site-packages/decord-0.6.0-py3.11-macosx-10.9-universal2.egg is deprecated. pip 24.3 will enforce this behaviour change. A possible replacement is to use pip for package installation.. Discussion can be found at https://github.com/pypa/pip/issues/12330[0m[33m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.11 -m pip install --upgrade pip[0m


In [10]:
# Importing necessary libraries
from pyspark.sql import SparkSession

# Initiating the Spark session
spark = SparkSession.builder \
    .appName("MyLocalSparkSession") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Checking the Spark session
print(spark.version)


Unexpected exception formatting exception. Falling back to standard exception


The operation couldn’t be completed. Unable to locate a Java Runtime.
Please visit http://www.java.com for information on installing Java.

/opt/homebrew/lib/python3.11/site-packages/pyspark/bin/spark-class: line 97: CMD: bad array subscript
head: illegal line count -- -1
Traceback (most recent call last):
  File "/Users/emilyng/Library/Python/3.11/lib/python/site-packages/IPython/core/interactiveshell.py", line 3526, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/var/folders/qs/szbbnwjs3xqfhs8mjs1pb1xr0000gn/T/ipykernel_39372/3789082066.py", line 8, in <module>
    .getOrCreate()
     ^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/pyspark/sql/session.py", line 497, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/pyspark/context.py", line 515, in getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/opt/homebrew/lib/python3.

## Databricks basics

In [None]:
%fs ls # file name e.g. /databricks-datasets/learning-spark-v2/flights/

In [None]:
%fs head # e.g. /databricks-datasets/learning-spark-v2/flights/summary-data/csv/2010-summary.csv

## Dataframe manipulations

In [None]:
# creating objects

df = spark.range(2, 1000, 2) # start, end, step
# creates a column of numbers with column name "id"

# df.withColumn(<column name>, <column expression>)


In [None]:
# join, show, groupby, count, orderBy etc 
df1.join(df2, df1.id == df2.id).show(10)
# join is default inner join
# other options: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti

display(df.limit(5))

df.select("column1", "column2").where(col("column1").isNotNull() | col("col2") == 123).distinct().orderBy("count", ascending = False).show(10)

# aggregating all 
df.select(sum("column1"), avg("column2"), max("column3")).show()

# use year() on (SQL Spark) for timestamp columns

# drop
df.drop("column1", "column2").show()

# rename
df.withColumnRenamed("column1", "new_column1").show()

### Column creations

In [None]:
# ETL: multiple column creation and subsequent dropping of columns
df.withColumn("new_column", to_timestamp(col("old_col"), "dd/MM/yyyy")).drop("old_col")\
    .withColumn("new_col2", col("new_column") - col("col3")).drop("old_col2")\
    .withColumn("new_col4", translate(col("col4"), "$", "").cast("double"))\
        .show()

In [None]:
# using SQL expressions
df = (df
          .withColumn("delay", expr("CAST(delay as INT) as delay"))
          .withColumn("distance", expr("CAST(distance as INT) as distance")))

foo2 = foo.withColumn("status", expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END"))

In [None]:
df.printSchema()

### handling null values

In [None]:
## dropping rows where column subset is null
noNullsDF = df.na.drop(subset=["host_is_superhost"])

## Spark RDD codes

In [None]:
df.rdd.getNumPartitions()

In [None]:
from pyspark.sql.functions import spark_partition_id
df.withColumn("partition_id", spark_partition_id())
# to see which rows belong to which partitions

In [None]:
from pyspark import StorageLevel

df.cache() # to cache the dataframe in memory
df.persist(StorageLevel.DISK_ONLY) # to cache the dataframe in memory

# note that Spark will only save NEW querys. If diff variable same query plan, spark will not save new one 
# How does it work for random arrays?? can try

df.unpersist()

### N.B. Spark is lazy, so it will not execute until an action is called
df.cahce().count() # in order to actually cache the dataframe!!

In [None]:
# creating and saving views in Spark

df.createOrReplaceTempView("dfTable")
spark.sql("CACHE TABLE dfTable")

spark.sql("SELECT count(*) FROM dfTable").show()

## ML Pipeline

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

In [None]:
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [None]:
# Fit the pipeline to training documents.
model = pipeline.fit(training)

In [None]:
# Make predictions on train documents and print columns of interest.
pred_train = model.transform(training)
pred_train.drop('rawPrediction').show() #.show(truncate = False)

In [None]:
# compute accuracy on the test set
pred_test = model.transform(test) # test is the test data
predictionAndLabels = pred_test.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

## Delta Lake (add this from Demo 4 later)

## Reading/Saving/Loading data 

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
# schema creation:
schema = StructType([StructField("name", StringType(), True), 
                     StructField("age", IntegerType(), True)])


In [None]:
# set schema when reading file
fire_df = spark.read.csv(sf_fire_file, header=True, schema=schema)

In [None]:
df.write.format("parquet").mode("overwrite").save("/tmp/fireServiceParquet/")
# saves it in dbfs of Databricks %fs ls /tmp/fireServiceParquet/

In [None]:
# loading parquet data
df = spark.read.format("parquet").load("/tmp/fireServiceParquet/")

In [None]:
# reading csv
airports = spark.read.options(header="true", inferSchema="true", sep="\t").csv(airports_path)
airports.createOrReplaceTempView("airports_na")
rawDF = spark.read.csv(filePath, header="true", inferSchema="true", multiLine="true", escape='"')
