# 5.SQL and Dataframes

References:

* Spark-SQL, <https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes>


# 5.1  Example Walkthrough
Follow the Spark SQL and Dataframes Examples below!

### Initialize PySpark

First, we use the findspark package to initialize PySpark.

In [4]:
# Initialize PySpark
APP_NAME = "PySpark Lecture Herget"
SPARK_MASTER="spark://mpp3r01c03s03.cos.lrz.de:7077"

# If there is no SparkSession, create the environment
try:
    sc and spark
except NameError as e:
  #import findspark
  #findspark.init()
    import pyspark
    import pyspark.sql
    from pyspark.sql import Row
    conf=pyspark.SparkConf().setAppName(APP_NAME).set("spark.cores.max", "8")
    sc = pyspark.SparkContext(master=SPARK_MASTER, conf=conf)
    spark = pyspark.sql.SparkSession(sc).builder.appName(APP_NAME).getOrCreate()

print("PySpark initiated...")

PySpark initiated...


### Hello, World!

Loading data, mapping it and collecting the records into RAM...

In [2]:
# Load the text file using the SparkContext
csv_lines = sc.textFile("../data/example.csv")

# Map the data to split the lines into a list
data = csv_lines.map(lambda line: line.split(","))

# Collect the dataset into local RAM
data.collect()

[['Russell Jurney', 'Relato', 'CEO'],
 ['Florian Liebert', 'Mesosphere', 'CEO'],
 ['Don Brown', 'Rocana', 'CIO'],
 ['Steve Jobs', 'Apple', 'CEO'],
 ['Donald Trump', 'The Trump Organization', 'CEO'],
 ['Russell Jurney', 'Data Syndrome', 'Principal Consultant']]

### Creating Rows

Creating `pyspark.sql.Rows` out of your data so you can create DataFrames...

In [3]:
# Convert the CSV into a pyspark.sql.Row
def csv_to_row(line):
    parts = line.split(",")
    row = Row(
      name=parts[0],
      company=parts[1],
      title=parts[2]
    )
    return row

# Apply the function to get rows in an RDD
rows = csv_lines.map(csv_to_row)

### Creating DataFrames from RDDs

Using the `RDD.toDF()` method to create a dataframe, registering the `DataFrame` as a temporary table with Spark SQL, and counting the jobs per person using Spark SQL.

In [4]:
# Convert to a pyspark.sql.DataFrame
rows_df = rows.toDF()

# Register the DataFrame for Spark SQL
rows_df.registerTempTable("executives")

# Generate a new DataFrame with SQL using the SparkSession
job_counts = spark.sql("""
SELECT
  name,
  COUNT(*) AS total
  FROM executives
  GROUP BY name
""")
job_counts.show()

# Go back to an RDD
job_counts.rdd.collect()

+---------------+-----+
|           name|total|
+---------------+-----+
|   Donald Trump|    1|
|Florian Liebert|    1|
|      Don Brown|    1|
| Russell Jurney|    2|
|     Steve Jobs|    1|
+---------------+-----+



[Row(name='Donald Trump', total=1),
 Row(name='Florian Liebert', total=1),
 Row(name='Don Brown', total=1),
 Row(name='Russell Jurney', total=2),
 Row(name='Steve Jobs', total=1)]

# 5.2-5.9 NASA DataSet

5.2 Create a Spark-SQL table with fields for IP/Host and Response Code from the NASA Log file! 

In [5]:
def nasa_split_http(line):
    parts = line.split(' ')
    if (len(parts) > 2):
        row = Row(
            ip=parts[0],
            http_response=parts[-2],
        )
        return row
    else:
        return {}
#199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
#row = Row(
#    ip=parts[0],
#    date=parts[3]+' '+parts[4],
#    http_method=parts[5].replace('\"', ''),
#    http_url=parts[6],
#    http_version=parts[7].replace('"',''),
#    http_response=parts[8],
#    response_time=parts[9]
#)

nasa = sc.textFile("../data/nasa/NASA_access_log_Jul95")
nasa_rows = nasa.flatMap(lambda line: line.split('\n')).map(nasa_split_http)
# Convert to a pyspark.sql.DataFrame
nasa_rows_df = nasa_rows.toDF()

# Register the DataFrame for Spark SQL
nasa_rows_df.registerTempTable("ip_response")

# Generate a new DataFrame with SQL using the SparkSession
job_show = spark.sql("""
SELECT
  *
  FROM ip_response
""")
job_show.show()

# Go back to an RDD
#job_show.rdd.collect()

+-------------+--------------------+
|http_response|                  ip|
+-------------+--------------------+
|          200|        199.72.81.55|
|          200|unicomp6.unicomp.net|
|          200|      199.120.110.21|
|          304|  burger.letters.com|
|          200|      199.120.110.21|
|          304|  burger.letters.com|
|          200|  burger.letters.com|
|          200|     205.212.115.106|
|          200|         d104.aa.net|
|          200|      129.94.144.152|
|          200|unicomp6.unicomp.net|
|          200|unicomp6.unicomp.net|
|          200|unicomp6.unicomp.net|
|          200|         d104.aa.net|
|          200|         d104.aa.net|
|          200|         d104.aa.net|
|          304|      129.94.144.152|
|          200|      199.120.110.21|
|          200|ppptky391.asahi-n...|
|          200|  net-1-141.eden.com|
+-------------+--------------------+
only showing top 20 rows



5.3 Run an SQL query that outputs the number of occurrences of each HTTP response code!

In [6]:
# Generate a new DataFrame with SQL using the SparkSession
job_show_53 = spark.sql("""
SELECT
  http_response, count(http_response)
  FROM ip_response
  GROUP BY http_response
""")
job_show_53.show()
 
# Go back to an RDD
job_show_53.rdd.collect()

+-------------+--------------------+
|http_response|count(http_response)|
+-------------+--------------------+
|          200|             1701534|
|         null|                   0|
|          302|               46573|
|          501|                  14|
|          404|               10845|
|          403|                  54|
|          500|                  62|
|          304|              132627|
|          400|                   5|
+-------------+--------------------+



[Row(http_response='200', count(http_response)=1701534),
 Row(http_response=None, count(http_response)=0),
 Row(http_response='302', count(http_response)=46573),
 Row(http_response='501', count(http_response)=14),
 Row(http_response='404', count(http_response)=10845),
 Row(http_response='403', count(http_response)=54),
 Row(http_response='500', count(http_response)=62),
 Row(http_response='304', count(http_response)=132627),
 Row(http_response='400', count(http_response)=5)]

5.4 Cachen Sie den Dataframe und führen Sie dieselbe Query nochmals aus! Messen Sie die Laufzeit für das Cachen und für die Ausführungszeit der Query!

In [10]:
import time
nasa_rows_df.cache()

start = time.time()
nasa_rows_df.cache()
end = time.time()
print("Cache time: ",end - start)

start = time.time()
job_show_53 = spark.sql("""
SELECT
  http_response, count(http_response)
  FROM ip_response
  GROUP BY http_response
""")
job_show_53.show()
end = time.time()
print("Query time: ",end - start)

Cache time:  0.002733469009399414
+-------------+--------------------+
|http_response|count(http_response)|
+-------------+--------------------+
|          200|             1701534|
|         null|                   0|
|          302|               46573|
|          501|                  14|
|          404|               10845|
|          403|                  54|
|          500|                  62|
|          304|              132627|
|          400|                   5|
+-------------+--------------------+

Query time:  25.073444604873657


5.5. Implement the same Query using the Dataframe API!

5.6 Führen Sie diesselbe Query mit/ohne Cache und 8, 16 Cores aus! Dokumentieren und erklären Sie das Ergebnis!

In [None]:
do not need

5.7 Performance Analysis: 
* Create RDDs with 2x, 4x, 8x and 16x of the size of the NASA log dataset! Persist the dataset in the Spark Cache! Use an appropriate number of cores (e.g. 8 or 16)!
* Measure and plot the response times for all datasets using a constant number of cores!
* Plot the results!
* Explain the results!



5.8 Strong Scaling

  * Measure the runtime for the query for 8, 16, 32, 64, 128, 256 cores for 1x and 16x datasets! Datasets cached in Memory!
  * Compute the speedup and efficiency!
  * Plot the responses!
  * Explain the results!

5.9 Convert the output to a Pandas dataframe and calculate the percentage of total for each response code!