# 5.SQL and Dataframes

References:

* Spark-SQL, <https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes>


# 5.1  Example Walkthrough
Follow the Spark SQL and Dataframes Examples below!

### Initialize PySpark

First, we use the findspark package to initialize PySpark.

In [1]:
# Initialize PySpark
APP_NAME = "PySpark Lecture"
SPARK_MASTER="spark://mpp3r03c04s06.cos.lrz.de:7077"

# If there is no SparkSession, create the environment
try:
    sc and spark
except NameError as e:
  #import findspark
  #findspark.init()
    import pyspark
    import pyspark.sql
    conf=pyspark.SparkConf().set("spark.cores.max", "4")
    sc = pyspark.SparkContext(master=SPARK_MASTER, conf=conf)
    spark = pyspark.sql.SparkSession(sc).builder.appName(APP_NAME).getOrCreate()

print("PySpark initiated...")

PySpark initiated...


### Hello, World!

Loading data, mapping it and collecting the records into RAM...

In [2]:
# Load the text file using the SparkContext
csv_lines = sc.textFile("../data/example.csv")

# Map the data to split the lines into a list
data = csv_lines.map(lambda line: line.split(","))

# Collect the dataset into local RAM
data.collect()

[[u'Russell Jurney', u'Relato', u'CEO'],
 [u'Florian Liebert', u'Mesosphere', u'CEO'],
 [u'Don Brown', u'Rocana', u'CIO'],
 [u'Steve Jobs', u'Apple', u'CEO'],
 [u'Donald Trump', u'The Trump Organization', u'CEO'],
 [u'Russell Jurney', u'Data Syndrome', u'Principal Consultant']]

### Creating Rows

Creating `pyspark.sql.Rows` out of your data so you can create DataFrames...

In [3]:
from pyspark.sql import Row

# Convert the CSV into a pyspark.sql.Row
def csv_to_row(line):
      parts = line.split(",")
      row = Row(
        name=parts[0],
        company=parts[1],
        title=parts[2]
      )
      return row

# Apply the function to get rows in an RDD
rows = csv_lines.map(csv_to_row)

### Creating DataFrames from RDDs

Using the `RDD.toDF()` method to create a dataframe, registering the `DataFrame` as a temporary table with Spark SQL, and counting the jobs per person using Spark SQL.

In [4]:
# Convert to a pyspark.sql.DataFrame
rows_df = rows.toDF()

# Register the DataFrame for Spark SQL
rows_df.registerTempTable("executives")

# Generate a new DataFrame with SQL using the SparkSession
job_counts = spark.sql("""
SELECT
  name,
  COUNT(*) AS total
  FROM executives
  GROUP BY name
""")
job_counts.show()

# Go back to an RDD
job_counts.rdd.collect()

+---------------+-----+
|           name|total|
+---------------+-----+
|   Donald Trump|    1|
|Florian Liebert|    1|
|      Don Brown|    1|
| Russell Jurney|    2|
|     Steve Jobs|    1|
+---------------+-----+



[Row(name=u'Donald Trump', total=1),
 Row(name=u'Florian Liebert', total=1),
 Row(name=u'Don Brown', total=1),
 Row(name=u'Russell Jurney', total=2),
 Row(name=u'Steve Jobs', total=1)]

# 5.2/5.3 NASA DataSet

5.2 Create a Spark-SQL table for the NASA Log files! 

     cat /data/NASA_access_log_Jul95 |awk -F' ' '{print "\""$4 $5"\","$(NF-1)","$(NF)}' > nasa.csv


5.3 Run an SQL query that outputs the number of occurrences of each HTTP response code!