In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
from functools import wraps
import time

# Introducing the DataFrames API
In Spark, a DataFrame object consists of [Row](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Row.html) objects and [Column](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.html) objects. Concretely, each row of a Spark DataFrame  is an instance of the ```pyspark.sql.Row``` while each column is an instance of the ```pyspark.sql.Column``` class. We will look at  each of these classes in detail.

## Creating DataFrames
1. From Python objects
2. External data sources
3. Other Spark objects

### Schemas
Also, when creating DataFrames, you have the option to use a schema or not. A schema in Spark defines the column names and associated data types for a DataFrame. Most often, schemas come into play when you are reading structured data from an external data source. When a schema is not used, Spark has to infer the data type which can slow your application if you have a massive  dataset. Although schemas are more of DBMS language but they offer several advantages when dealing with large datasets:
- Spark doesnt have to infer data types, so you get speed benefits.
- Without a schema, Spark creates a separate job just to read a large portion of your file to ascertain the schema, which for a large data file can be expensive and time-consuming. As such, defining a schema will avoid this.
- You can detect errors early if data doesn’t match the schema.
#### Defining Schemas
- Programmatically using Spark DataTypes 
- Using Data Definition Language (DDLs)

### Spark DataFrame from Python objects

In [None]:
# Define schema using Spark DataTypes
schema = StructType([StructField("author_name", StringType(), False),
      StructField("book_title", StringType(), False),
      StructField("num_pages", IntegerType(), False)])

# Define Schema using DDL
schema = "author_name STRING, book_title STRING, num_pages INT"

In [None]:
# Define schema for our data using DDL
schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING,`Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"

In [None]:
# A simple statistic data
# in real life, we can get alot data in the o=form of Python objects and want to create SparkDataFrames
# for instance, data being downloaded from websites
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter","LinkedIn"]],
           [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter",
    "LinkedIn"]],
           [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web",
    "twitter", "FB", "LinkedIn"]],
           [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568,
    ["twitter", "FB"]],
           [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web",
    "twitter", "FB", "LinkedIn"]],
           [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568,
    ["twitter", "LinkedIn"]]
          ]

# Create a SparkSession
# spark = SparkSession.builder.master("local[*]").appName("DataFrameFromPythonObj").getOrCreate()
spark=SparkSession.builder.appName("intro").master("local[*]").config("spark.driver.bindAddress", "127.0.0.1").getOrCreate()

# Create a DataFrame using the schema defined above
sdf = spark.createDataFrame(data, schema)
# Show the DataFrame; it should reflect our table above blogs_df.show()
# Print the schema used by Spark to process the DataFrame
print(sdf.printSchema())

### EXERCISE-1: READ CSV WITH SCHEMA
1. Use Spark documentation on how to read from file with a define schema. 
Note, the schema is what we arleady defined above. The data above has been saved as ```blog_simple_dataset.csv```. Read it as a Spark DataFrame with schema. Answer this question in the next cell.
2. Define schema for the ```activity_raw_data.csv``` use string for the datetime column
3. Load the dataset with and without schema using the functions defined below. Compare the loading times. Answer this question by completing the functions defined below and calling them.

### Spark DataFrame from external data sources
The most common way (which we have already seen) is to load data from exteernal data sources and 
Spark supports numerous data stores. Spark reads data  through the ```DataFrameReaderobject```. Please look at the documeentation [here](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html) to see all data sources that the Spark  ```DataFrameReaderobject``` supports.

In [None]:
from IPython.display import Image
Image("SparkConnectors.png")

In [None]:
def timefn(fn):
    """
    Function for recording running time of a function
    """
    @wraps(fn)
    def measure_time(*args, **kwargs):
        t1 = time.time()
        result = fn(*args, **kwargs)
        t2 = time.time()
        print("@timefn:" + fn.__name__ + " took " + str(t2 - t1) + " seconds")
        return result
    return measure_time

In [None]:
@timefn
def load_with_schema(large_csv):
    # define the schema here
    # you can load part of the file with pandas (just a few rows)
    # to remind yourself of the data types
    schema = YOUR CODE HERE 
    spark = SparkSession.builder.master("local[*]").appName("ReadWithChema").getOrCreate()
    # Now read the data 
    sdf = YOUR CODE HERE 

In [None]:
@timefn
def load_without_schema(large_csv):
    spark = SparkSession.builder.master("local[*]").appName("DataFrameFromPythonObj").getOrCreate()
    sdf = spark.read.csv(large_csv, header=True)

In [None]:
load_with_schema("/Users/dmatekenya/Desktop/TMP/data/activity_log_raw.csv")

In [None]:
load_without_schema("/Users/dmatekenya/Desktop/TMP/data/activity_log_raw.csv")

## Columns and Expressions in  DataFrames
In Spark DataFrames, columns behave like pandas DataFrames in several ways but they also behave different. You can list all the columns by their names, and you can perform operations on their values using relational or computational expressions. 
- [Column](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.html) is the name of the object, which has many import methods such as describe  while [col()](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.col.html) is a standard built-in function that returns a Column.

We need to use the col() and expr() function available in pyspark,sql.functions() for many operations such as:
- Add, rename columns
- Subset data based on columns
- Access columns to compute stats on them
-  Access columns to compute operations on them such as sorting

### Add a new column using expr and col
In order to add a new column in a Spark DataFrame, we use the ```DataFrame.withColumn(new_col_name, expression_to_compute_new_col)```

In [None]:
from pyspark.sql.functions import *

In [None]:
csv_fpath = "/Users/dmatekenya/Desktop/TMP/data/activity_log_raw.csv"
spark = SparkSession.builder.master("local[*]").appName("DataFrameFromPythonObj").getOrCreate()
sdf = spark.read.csv(csv_fpath, header=True)

In [None]:
# use expr
sdf2 = sdf.withColumn("new_col", (expr("ACTIVITY_ID > 10000")))
sdf2.show()

In [None]:
# use the col function which I prefer over the expr col("Hits")
sdf2 = sdf.withColumn("new_col", col("ACTIVITY_ID") > 10000)
sdf2.show()

### Subset data  based on a few columns
In order to access a single or multiple columns, we use the ```select()``` function on the DataFrame

In [None]:
sdf3 = sdf.select('ACTIVITY_TIME', 'STATUS')
sdf3.show()

**EXERCISE-2:**

1. Check  if these statements: df.select(expr("ACTIVITY_TIME")).show(2), df.select(col("ACTIVITY_TIME")).show(2)
and df.select("ACTIVITY_TIME").show(2) will provide  the same output. Replace df with name of your Spark DataFrame.

2. Create a new DataFrame using expr to get only those rows where STATUS is "S"
Note that expr() just perfoms the operation, it doesnt filter our the rows which evaluate to false.
2. Sort DataFrame: use the col function to sort the DataFrame on "SID" column

In [None]:
# YOUR CODE

In [None]:
sdf.sort(col("SID")).show()

### Rows
A row in Spark is a generic Row object, containing one or more columns. Each column may be of the same data type (e.g., integer or string), or they can have different types (integer, string, map, array, etc.). Because Row is an object in Spark and an ordered collection of fields, you can instantiate a Row the same way we instantiate any object. Consequently, you can collect Row objects in a list and create a Spark DataFrame.

In [None]:
from pyspark import Row
row = Row(name="Alice", age=11)

In [None]:
rows = [Row(name="Matei Zaharia", state="CA"), Row(name="Reynold Xin", state="CA")]
spark_df_from_rows = spark.createDataFrame(rows)
spark_df_from_rows.show()

**EXERCISE-3:** Creating a Spark DataFrame with Rows. Please complete the function below and call it.

In [None]:
def convert_json_to_spark_with_rows(json_file):
    # create  a list to hold all Row objects
    rows = YOUR CODE
    for idx, row in df.iterrows():
        # get lon and lat from the coord column using indexing, dict key access
        x = row['coord']['lon']       
        y = row['coord']['lat']
        # create the Row object here 
        srow = YOUR CODE
        
        # append this row object to the list
        YOUR CODE
    
    # When creating Spark DataFrame this way, its better to use schema to avoid troubles
    # create a schema for this data here, use DOUBLE as data type for lon and lat
    schema = YOUR CODE
    
    # use spark.createDataFrame() here
    # if yiu get errors, use the option verifySchema=False
    spark_df = YOUR CODE
    
    # use show() statement to show the DataFrame
    # use show() with print to ensure we see the outputs
    YOUR CODE

In [None]:
jsonfile = "../data/city.list.json"
convert_json_to_spark_with_rows(jsonfile)

# Common DataFrames Operations

In [None]:
# TO BE CONTINUED

In [None]:
csv_fpath = "/Users/dmatekenya/Desktop/TMP/data/activity_log_raw.csv"
spark = SparkSession.builder.master("local[*]").appName("DataFrameFromPythonObj").getOrCreate()
sdf = spark.read.csv(csv_fpath, header=True)

In [None]:
sdf2 = sdf.select('ACTIVITY_TIME', 'STATUS')
sdf2.show()

In [None]:
sdf3 = sdf.select('ACTIVITY_TIME', 'STATUS')