# Simple Data Analysis

During your data processing tasks, you might feel the need to get some insights into the data at hand. For this we can make use of Spark SQL.

In [None]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

sqlContext = SQLContext(sc)

## Loading the data
Let's say we want to get insights into the daily_weather_sales_per_state view we generated earlier. We stored that view using a CSV format at /data/views/daily_weather_sales_per_state. What you might remember as well was the difficulty we had with reading CSV formatted data previously. We had to read the data line by line, splitting the line and converting it into the format we wanted to work with and eventually still ending up with one big array.

Well, The guys from databricks helped us out a bit by providing a spark-csv package. We can use this package to read CSV formatted data as a DataFrame. Once we have the data loaded, we can do all kind of things to it.

The first step when using spark-csv is to describe the schema of the data. When working with CSV-formatted data, there might be a header available as the first line holding the column names. But since our data has been generated in a distributed way, there is no knowing what the actual column names are. So in order to load the data we have to describe it first.

A custom schema, or a StructType can be created by providing a list of StructField's to it. A StructField is an actual column of the dataset, describing a field name, field type and a boolean to indicate if the field value could be *null*.

In fact, a schema could look like this:

In [None]:
mySchema = StructType([ \
    StructField("field_1", StringType(), True), \
    StructField("field_2", IntegerType(), True), \
                      
    StructField("field_n", FloatType(), True) \
])

So let us define the schema for our data and store it in a variable called **sales_schema**

In [None]:
sales_schema = StructType([ \
    StructField("key", StringType(), True), \
    StructField("customer_age", IntegerType(), True), \
    StructField("customer_gender", StringType(), True), \
    StructField("customer_key", IntegerType(), True), \
    StructField("customer_marital_status", StringType(), True), \
    StructField("customer_name", StringType(), True), \
    StructField("customer_state", StringType(), True), \
    StructField("date", StringType(), True), \
    StructField("employee_gender", StringType(), True), \
    StructField("employee_job_title", StringType(), True), \
    StructField("employee_key", IntegerType(), True), \
    StructField("employee_name", StringType(), True), \
    StructField("employee_state", StringType(), True), \
    StructField("price", FloatType(), True), \
    StructField("product_category", StringType(), True), \
    StructField("product_department", StringType(), True), \
    StructField("product_description", StringType(), True), \
    StructField("product_key", IntegerType(), True), \
    StructField("product_price", FloatType(), True), \
    StructField("product_version", IntegerType(), True), \
    StructField("quantity", FloatType(), True), \
    StructField("store_key", IntegerType(), True), \
    StructField("store_name", StringType(), True), \
    StructField("store_state", StringType(), True), \
    StructField("tender_type", StringType(), True), \
    StructField("transaction", StringType(), True), \
    StructField("transaction_type", StringType(), True), \
    StructField("rainfall", FloatType(), True), \
    StructField("temp_avg", FloatType(), True), \
    StructField("temp_max", FloatType(), True), \
    StructField("temp_min", FloatType(), True) \
])

That was quite a bit of code there. luckily you will only have to do that once. Now we do have the schema, but we have not loaded the data yet. The **sqlContext** has a **sqlContext.read.load(path, options)** function we can use to load the data, but that will not use the CSV format by default. We can manipulate the way the data is read by calling an optional **format(_format_)** function before calling **load(path, options)**. The format we would use is called **com.databricks.spark.csv**.

And then there is the schema which we will need to provide when loading the data. You can do so by passing **schema=sales_schema** in the options part of the load function.

Let us create a new DataFrame with the contents of the CSV file. Once the DataFrame has been created you can call **take(_n_)** to display the first n records, just like you did with an RDD.

In [None]:
df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .load('/data/views/daily_weather_sales_per_state', schema = sales_schema)
df.take(1)

Wicked! We created our first DataFrame!

## Working with data
Let's start simple by showing the top 5 stores with the most sales out of our DataFrame. This will require us to group our data by store_key and counting the sales records for each store. After that we will sort the results descending.

In [None]:
high_performing_stores = df \
    .groupBy('store_key') \
    .count() \
    .sort(desc('count'))
    
high_performing_stores.take(5)

Ok, let's move to something more difficult. We have a full collection of transactions, but we don't know the total value of a single transaction. So let us add a column to our DataFrame to calculate the transaction total. DataFrame has a function called **withColumn(_name_, _expression_)** that can help us to accomplish this.

In [None]:
transaction_totals = df \
    .withColumn('total', df.price * df.quantity) \
    .select('date', 'transaction', 'customer_key', 'customer_marital_status', 'customer_age', 'customer_gender', 'total') \

transaction_totals = transaction_totals \
    .withColumn('customer_age_bucket', ceil(transaction_totals.customer_age / 10))
    
transaction_totals.take(1)

Now we know the value of a single transaction,  but we would like to know the total value of the goods bought by the customer on a single day. For this we can group our transaction totals by date and customer and calculate the sum of all totals.

If you want to do more than one aggregation, you can make use of the DataFrame's **agg(_expressions_)** function. You can pass columns or column expressions to the _expressions_ argument. Common column expressions are **sum(_field_)**, **count(_field_)**, **avg(_field_)**, **min(_field_)** and **max(_field_)**.

In [None]:
tt = transaction_totals \
    .groupBy('date', 'customer_key', 'customer_marital_status', 'customer_age', 'customer_age_bucket', 'customer_gender') \
    .agg(sum('total').alias('total'))
    
tt.take(1)

### Getting some customer information
With the use of the transactions and the transaction totals we can create an image of our customers. Let's start by figuring out the average amount spent by a customer on each visit. and let's group that by customer_marital_status to see if there are more customers with a certain marital state buying our goods.

In [None]:
marital_status_visits = tt \
    .groupBy('customer_marital_status') \
    .agg( \
        sum('total').alias('total'), \
        count('customer_key').alias('visits'), \
        avg('total').alias('avg_per_visit') \
    ) \
    .sort(desc('avg_per_visit'))
    
marital_status_visits.show();

Well done! Let's do the same thing again, but this time we group by customer_gender.

In [None]:
gender_visits = tt \
    .groupBy('customer_gender') \
    .agg( \
        sum('total').alias('total'), \
        count('customer_key').alias('visits'), \
        avg('total').alias('avg_per_visit') \
    ) \
    .sort(desc('avg_per_visit'))
    
gender_visits.show();

And since we also have the customer_age, it would be nice to see what age our customers are. But here is the catch; we want to group by age ranges instead of the actual age of a customer.

In [None]:
transaction_totals_with_age_bucket = transaction_totals \
    .withColumn('customer_age_bucket', ceil(transaction_totals.customer_age / 10))
    
tt_with_age_bucket = transaction_totals \
    .groupBy('date', 'customer_key', 'customer_marital_status', 'customer_age', 'customer_age_bucket', 'customer_gender') \
    .agg(sum('total').alias('total'))
    
age_visits = tt_with_age_bucket \
    .groupBy('customer_age_bucket') \
    .agg( \
        sum('total').alias('total'), \
        count('customer_key').alias('visits'), \
        avg('total').alias('avg_per_visit') \
    ) \
    .sort(desc('customer_age_bucket'))
    
age_visits.show();

### Creating customer profiles
Let's group everything we know about our customers to see which customer segment is generating the most turnover.

In [None]:
customer_segments = tt \
    .groupBy(tt.customer_gender, tt.customer_marital_status, tt.customer_age_bucket) \
    .agg( \
        sum('total').alias('total'), \
        count('customer_key').alias('visits'), \
        avg('total').alias('avg_per_visit') \
    ) \
    .sort(desc('total'))
    
customer_segments.show()

## Storing our results
DataFrames can also be stored to HDFS using several different output formats. The DataFrame provides a **write** member with functions for storing in json, orc or parquet. For this our case we will use parquet to store the customer_segments as a new view.

In [None]:
customer_segments.write.parquet('/data/views/customer_segments')