# 4.SQL and Dataframes

References:

* Spark-SQL, <https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes>


# 4.1  Example Walkthrough
Follow the Spark SQL and Dataframes Examples below!

### Initialize PySpark

First, we use the findspark package to initialize PySpark.

In [None]:
!pip install -q pyspark

In [None]:
!cat /proc/cpuinfo

In [None]:
# Initialize PySpark
import os, sys
APP_NAME = "PySpark Lecture"
SPARK_MASTER="local[2]"
import pyspark
import pyspark.sql
from pyspark.sql import Row
conf=pyspark.SparkConf()
conf=pyspark.SparkConf().setAppName(APP_NAME).set("spark.local.dir", os.path.join(os.getcwd(), "tmp"))
sc = pyspark.SparkContext(master=SPARK_MASTER, conf=conf)
spark = pyspark.sql.SparkSession(sc).builder.appName(APP_NAME).getOrCreate()

print("PySpark initiated...")

### Hello, World!

Loading data, mapping it and collecting the records into RAM...

In [None]:
!wget https://raw.githubusercontent.com/scalable-infrastructure/exercise-2024/main/data/example.csv

In [None]:
# Load the text file using the SparkContext
csv_lines = sc.textFile("example.csv")

# Map the data to split the lines into a list
data = csv_lines.map(lambda line: line.split(","))

# Collect the dataset into local RAM
data.collect()

### Creating Rows

Creating `pyspark.sql.Rows` out of your data so you can create DataFrames...

In [None]:
# Convert the CSV into a pyspark.sql.Row
def csv_to_row(line):
    parts = line.split(",")
    row = Row(
      name=parts[0],
      company=parts[1],
      title=parts[2]
    )
    return row

# Apply the function to get rows in an RDD
rows = csv_lines.map(csv_to_row)

### Creating DataFrames from RDDs

Using the `RDD.toDF()` method to create a dataframe, registering the `DataFrame` as a temporary table with Spark SQL, and counting the jobs per person using Spark SQL.

In [None]:
# Convert to a pyspark.sql.DataFrame
rows_df = rows.toDF()

# Register the DataFrame for Spark SQL
rows_df.registerTempTable("executives")

# Generate a new DataFrame with SQL using the SparkSession
job_counts = spark.sql("""
SELECT
  name,
  COUNT(*) AS total
  FROM executives
  GROUP BY name
""")
job_counts.show()

# Go back to an RDD
job_counts.rdd.collect()

# 4.2-4.9 NASA DataSet

4.2 Create a Spark-SQL table with fields for IP/Host and Response Code from the NASA Log file! 

4.3 Run an SQL query that outputs the number of occurrences of each HTTP response code!

4.4 Implement the same Query using the Dataframe API!

4.5 Cachen Sie den Dataframe und führen Sie dieselbe Query nochmals aus! Messen Sie die Laufzeit für das Cachen und für die Ausführungszeit der Query!

4.6 Performance Analysis - Weak Scaling: 
* Create RDDs with 2x, 4x, 8x and 16x of the size of the NASA log dataset! Persist the dataset in the Spark Cache! Use an appropriate number of cores (e.g. 8 or 16)!
* Measure and plot the response times for all datasets using a constant number of cores!
* Plot the results!
* Explain the results!



4.7 Performance Analysis - Weak Scaling: 

  * **Measure the runtime for the query for 1, 2, 4 worker threads (local[n]) for 1x and 16x datasets!** Datasets cached in Memory! Note that Collab environment only has two cores!
  * Compute the speedup and efficiency!
  * Plot the responses!
  * Explain the results!