<img src='https://raw.githubusercontent.com/bradenrc/sparksql_pot/master/sparkSQL4.png' width="80%" height="80%"></img>

<img src='https://raw.githubusercontent.com/bradenrc/sparksql_pot/master/sparkSQL2.png' width="80%" height="80%"></img>

<img src='https://raw.githubusercontent.com/bradenrc/sparksql_pot/master/sparkSQL3.png' width="80%" height="80%"></img>

<img src='https://raw.githubusercontent.com/bradenrc/sparksql_pot/master/sparkSQL1.png' width="80%" height="80%"></img>

# SQL queries Dataframes, not RDDs

A data file on world banks will downloaded from GitHub after removing any previous data that may exist

In [None]:
# In jupyter notebooks you can prefice commands with a ! to run shell commands
# here we remove any files with the name of the file we are going to download
# then download the file

!rm world_bank.json.gz -f
!wget https://raw.githubusercontent.com/bradenrc/sparksql_pot/master/world_bank.json.gz

# Many other types are supported including text and Parquet

Here we are creating a Dataframe, similar to an RDD, but with a schema and abstraction that allows
for SQL to be used.

In [None]:
#You can load json, text and other files using sqlContext
#unlinke an RDD, this will attempt to create a schema around the data
#self describing data works really well for this

example1_df = spark.read.json("./world_bank.json.gz")

In [None]:
# Spark SQL has the ability to infer the schema of JSON data and understand the structure of the data
#once we have created the Dataframe, we can print out the schema to see the shape of the data

print example1_df.printSchema()

# Let's take a look at the first two rows of data

The example below enumerates our "take" command that pulls 2 items from the Dataframe
<br>a simpiler option to see the data could also be:<br>

##### copy and run the following code
    for row in example1_df.take(2):
        print row
        print "*" * 20

In [None]:
for row in example1_df.take(2):
        print row
        print "*" * 20

# Now let's register a table which is a pointer to the Dataframe and allows data access via Spark SQL

##### copy and run the following code
    #Simply use the Dataframe Object to create the table:
    example1_df.registerTempTable("world_bank")

In [None]:
    #Simply use the Dataframe Object to create the table:
    example1_df.registerTempTable("world_bank")

### The returned object will be a dataframe
##### copy and run the following code
    temp_df =  spark.sql("select * from world_bank limit 2")

    print type(temp_df)
    print "*" * 20
    print temp_df

In [None]:
    temp_df =  spark.sql("select * from world_bank limit 2")

    print type(temp_df)
    print "*" * 20
    print temp_df


#### One nice feature of the notebooks and python is that we can show it in a table via Pandas
spark.sql("select id, borrower from world_bank limit 2").toPandas()

In [None]:
spark.sql("select id, borrower from world_bank limit 2").toPandas()

### Here is a simple group by example:

#### Count the number of projects by each country, only list the top 10

##### Copy and paste the following: 

    query = """
    select countryname,
    count(1) as projects
    from world_bank
    group by countryname
    order by projects desc
    limit 10
    """
    spark.sql(query).toPandas()


In [None]:
    query = """
    select countryname,
    count(1) as projects
    from world_bank
    group by countryname
    order by projects desc
    limit 10
    """
    spark.sql(query).toPandas()

### Simple Example of Adding a Schema (headers) to an RDD and using it as a dataframe

### In the example below a simple RDD is created with Random Data in two columns and an ID column.

#### copy and run the following code

    import random

    #first let's create a simple RDD

    #create a Python list of lists for our example
    data_e2 = []
    for x in range(1,6):
        random_int = int(random.random() * 10)
        data_e2.append([x, random_int, random_int^2])

    #create the RDD with the random list of lists
    rdd_example2 = sc.parallelize(data_e2)
    print rdd_example2.collect()


In [None]:
import random

#first let's create a simple RDD

#create a Python list of lists for our example
data_e2 = []
for x in range(1,6):
    random_int = int(random.random() * 10)
    data_e2.append([x, random_int, random_int^2])

#create the RDD with the random list of lists
rdd_example2 = sc.parallelize(data_e2)
print rdd_example2.collect()

#### Now we can assign some header information

#### copy and run the following code
    from pyspark.sql.types import *

    # The schema is encoded in a string.
    schemaString = "ID VAL1 VAL2"

    fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
    schema = StructType(fields)

    # Apply the schema to the RDD.
    schemaExample = sqlContext.createDataFrame(rdd_example2, schema)

    # Register the DataFrame as a table.
    schemaExample.registerTempTable("example2")

    # Pull the data
    print schemaExample.collect()



In [None]:
    from pyspark.sql.types import *

    # The schema is encoded in a string.
    schemaString = "ID VAL1 VAL2"

    fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
    schema = StructType(fields)

    # Apply the schema to the RDD.
    schemaExample = sqlContext.createDataFrame(rdd_example2, schema)

    # Register the DataFrame as a table.
    schemaExample.registerTempTable("example2")

    # Pull the data
    print schemaExample.collect()


#### Now we can convert rdd_example3 to a Dataframe

##### copy and run this code
    from pyspark.sql import Row

    rdd_example3 = rdd_example2.map(lambda x: Row(id=x[0], val1=x[1], val2=x[2]))
    print rdd_example3.collect()
    df_example3 = rdd_example3.toDF()

In [None]:
    from pyspark.sql import Row

    rdd_example3 = rdd_example2.map(lambda x: Row(id=x[0], val1=x[1], val2=x[2]))
    print rdd_example3.collect()
    df_example3 = rdd_example3.toDF()

### Register this new data frame as a table 
register as temp table, call it 'df_example3'

    df_example3.registerTempTable('df_example3')

In [None]:
df_example3.registerTempTable('df_example3')

# Another powerful feature is the ability to create Functions and Use them in SQL Here is a simple example

First we create a function in Python, then register it allowing for us to call it via SQL

#### copy and run the following code
    def simple_function(v):
        return int(v * 10)

    #test the function
    print simple_function(3)

In [None]:
    def simple_function(v):
        return int(v * 10)

    #test the function
    print simple_function(3)

#### Now we can register the function for use in SQL
spark.udf.register('simple_function',simple_function,pyspark.sql.types.IntegerType())


In [None]:
spark.udf.register('simple_function',simple_function,pyspark.sql.types.IntegerType())

#### VAL1 and VAL2 look like strings, we can cast them as well

    query = """
    select
        ID,
        VAL1,
        VAL2,
        simple_function(cast(VAL1 as int)) as s_VAL1,
        simple_function(cast(VAL2 as int)) as s_VAL2
    from
     example2
    """
    spark.sql(query).toPandas()

In [None]:
    query = """
    select
        ID,
        VAL1,
        VAL2,
        simple_function(cast(VAL1 as int)) as s_VAL1,
        simple_function(cast(VAL2 as int)) as s_VAL2
    from
     example2
    """
    spark.sql(query).toPandas()

# Pandas & Seaborn Example
Pandas & Seaborn are a common abstraction for working with data in Python.

We can turn Pandas Dataframes into Spark Dataframes, the advantage of this 
could be scale or allowing us to run SQL statements agains the data.

### copy and run the following code
    import seaborn as sns
    import pandas as pd
    print pd

In [None]:
    import seaborn as sns
    import pandas as pd
    print pd

### First, let's grab some UFO data to play with

    !rm SIGHTINGS.csv -f
    !wget https://raw.githubusercontent.com/brianmangan/dsx-spark/master/SIGHTINGS.csv

In [None]:
    !rm SIGHTINGS.csv -f
    !wget https://raw.githubusercontent.com/brianmangan/dsx-spark/master/SIGHTINGS.csv

### Using the CSV file, we can create a Pandas Dataframe:
    pandas_df = spark.read.format("csv").options(header="true").load("./SIGHTINGS.csv")
    pandas_df.head()

In [None]:
    pandas_df = spark.read.format("csv").options(header="true").load("./SIGHTINGS.csv")
    pandas_df.head()

### Register a Temp Table & look at the dataset, query on date posted and get count 
#### copy and paste this code

    ufo_data = pandas_df
    ufo_data.registerTempTable('ufo_data')
    
#### In another cell:


    query = """
    select count(*), date_posted 
    FROM
        ufo_data
    GROUP BY 
        date_posted
    limit 20

    """
    spark.sql(query).toPandas()


In [None]:
    ufo_data = pandas_df
    ufo_data.registerTempTable('ufo_data')

In [None]:
    query = """
    select count(*), date_posted 
    FROM
        ufo_data
    GROUP BY 
        date_posted
    limit 20

    """
    spark.sql(query).toPandas()

# Visualizing the Data
- Here are some simple ways to create charts using Pandas and Seaborn
- In order to display in the notebook we need to tell matplotlib to render inline
at this point import the supporting libraries as well


In [None]:
%matplotlib inline 
import matplotlib.pyplot as plt, numpy as np

Pandas can call a function "plot" to create the charts.
Since most charts are created from aggregates the record
set should be small enough to store in Pandas

We can take our UFO data from before and create a 
Pandas Dataframe from the Spark Dataframe

    ufos_df = spark.sql(query).toPandas()

In [None]:
ufos_df = spark.sql(query).toPandas()

To plot we call the "plot" method and specify the type, x and y axis columns
and optionally the size of the chart.

Many more details can be found here:
http://pandas.pydata.org/pandas-docs/stable/visualization.html
    
    
#### copy and run this code
    ufos_df.plot(kind='bar', x='date_posted', y='count(1)', figsize=(6, 2))

In [None]:
ufos_df.plot(kind='bar', x='date_posted', y='count(1)', figsize=(6, 2))

### Check how many observations have been made across the entire dataset
    spark.sql("select count(*) from ufo_data").toPandas()

In [None]:
spark.sql("select count(*) from ufo_data").toPandas()

## Let's get a description and some info about your dataframe 
    ufo_pandas = pandas_df.toPandas()
    ufo_pandas.describe()

#### In another Cell:
    ufo_pandas.info()


In [None]:
    ufo_pandas = pandas_df.toPandas()
    ufo_pandas.describe()

In [None]:
    ufo_pandas.info()

## Density Plot
Using Seaborn for collecting sightings by year, we can see that the peak of sightings occured around the year 2000.

    The steps we took:
        Using the pandas dataframe we cleanse the datatype and make it integers for visualization.

    ufo_pandas['datetime'] = pd.to_datetime(ufo_pandas['datetime'], errors='coerce')
    ufo_pandas['year'] = ufo_pandas['datetime'].dt.year
    ufo_pandas['year'] = ufo_pandas['year'].fillna(0).astype(int)

    plt.figure(figsize=(8,4))
    sns.distplot(ufo_pandas['year'])
    plt.xlim(1900,2015)
    plt.show()

In [None]:
    ufo_pandas['datetime'] = pd.to_datetime(ufo_pandas['datetime'], errors='coerce')
    ufo_pandas['year'] = ufo_pandas['datetime'].dt.year
    ufo_pandas['year'] = ufo_pandas['year'].fillna(0).astype(int)

    plt.figure(figsize=(8,4))
    sns.distplot(ufo_pandas['year'])
    plt.xlim(1900,2015)
    plt.show()

## Sightings by State
    From this bar chart, we see that there are more than 500 UFO sightings reported in California. And there are 6 states that have reported more than 200 UFO sightings.
    
    

    plt.figure(figsize=(16,8))
    sns.countplot(x="state", data=ufo_pandas, palette="Greens_d")
    plt.show()

In [None]:


plt.figure(figsize=(16,8))
sns.countplot(x="state", data=ufo_pandas, palette="Greens_d")
plt.show()

