# Dataframes

* Dataframes are a restricted sub-type of RDDs. 
* Restircing the type allows for more optimization.

* Dataframes store two dimensional data, similar to the type of data stored in a spreadsheet. 
   * Each column in a dataframe can have a different type.
   * Each row contains a `record`.

* Similar to, but not the same as, [pandas dataframes](http://pandas.pydata.org/pandas-docs/stable/dsintro.html#dataframe) and [R dataframes](http://www.r-tutor.com/r-introduction/data-frame)

In [5]:
import os
import sys

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType

### Constructing a DataFrame from an RDD of Rows
Each Row defines it's own  fields, the schema is *inferred*.

In [7]:
# One way to create a DataFrame is to first define an RDD from a list of Rows 
some_rdd = sc.parallelize([Row(name=u"John", age=19),
                           Row(name=u"Smith", age=23),
                           Row(name=u"Sarah", age=18)])
some_rdd.collect()

In [8]:
# The DataFrame is created from the RDD or Rows
# Infer schema from the first row, create a DataFrame and print the schema
some_df = spark.createDataFrame(some_rdd)
some_df.printSchema()
some_df.show()

In [9]:
# A dataframe is an RDD of rows plus information on the schema.
# performing **collect()* on either the RDD or the DataFrame gives the same result.
print(type(some_rdd),type(some_df))
print('some_df =',some_df.collect())
print('some_rdd=',some_rdd.collect())
some_df.show()

### Defining the Schema explicitly
The advantage of creating a DataFrame using a pre-defined schema allows the content of the RDD to be simple tuples, rather than rows.

In [11]:
# In this case we create the dataframe from an RDD of tuples (rather than Rows) and provide the schema explicitly
another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)])
# Schema with two fields - person_name and person_age
schema = StructType([StructField("person_name", StringType(), False),
                     StructField("person_age", IntegerType(), False)])

# Create a DataFrame by applying the schema to the RDD and print the schema
another_df = spark.createDataFrame(another_rdd, schema)
another_df.printSchema()
another_df.show()
# root
#  |-- age: binteger (nullable = true)
#  |-- name: string (nullable = true)

## Loading DataFrames from disk
There are many maethods to load DataFrames from Disk. Here we will discuss three of these methods
1. Parquet
2. JSON (on your own)
3. CSV  (on your own)

In addition, there are API's for connecting Spark to an external database. We will not discuss this type of connection in this class.

### Loading dataframes from JSON files
[JSON](http://www.json.org/) is a very popular readable file format for storing structured data.
Among it's many uses are **twitter**, `javascript` communication packets, and many others. In fact this notebook file (with the extension `.ipynb` is in json format. JSON can also be used to store tabular data and can be easily loaded into a dataframe.

In [14]:
!wget 'https://mas-dse-open.s3.amazonaws.com/Moby-Dick.txt' -P ../../Data/

In [15]:
# when loading json files you can specify either a single file or a directory containing many json files.
path = "/FileStore/tables/people.json"

# Create a DataFrame from the file(s) pointed to by path
people = spark.read.json(path)
print('people is a',type(people))
# The inferred schema can be visualized using the printSchema() method.
people.show()

people.printSchema()

### Excercise: Loading csv files into dataframes

Spark 2.0 includes a facility for reading csv files. In this excercise you are to create similar functionality using your own code.

You are to write a class called `csv_reader` which has the following methods:

* `__init__(self,filepath):` recieves as input the path to a csv file. It throws an exeption `NoSuchFile` if the file does not exist.
* `Infer_Schema()` opens the file, reads the first 10 lines (or less if the file is shorter), and infers the schema. The first line of the csv file defines the column names. The following lines should have the same number of columns and all of the elements of the column should be of the same type. The only types allowd are `int`,`float`,`string`. The method infers the types of the columns, checks that they are consistent, and defines a dataframe schema of the form:
```python
schema = StructType([StructField("person_name", StringType(), False),
                     StructField("person_age", IntegerType(), False)])
```
If everything checks out, the method defines a `self.` variable that stores the schema and returns the schema as it's output. If an error is found an exception `BadCsvFormat` is raised.
* `read_DataFrame()`: reads the file, parses it and creates a dataframe using the inferred schema. If one of the lines beyond the first 10 (i.e. a line that was not read by `InferSchema`) is not parsed correctly, the line is not added to the Dataframe. Instead, it is added to an RDD called `bad_lines`.
The methods returns the dateFrame and the `bad_lines` RDD.

### Parquet files

* [Parquet](http://parquet.apache.org/) is a popular columnar format.

* Spark SQL allows [SQL](https://en.wikipedia.org/wiki/SQL) queries to retrieve a subset of the rows without reading the whole file.

* Compatible with HDFS : allows parallel retrieval on a cluster.

* Parquet compresses the data in each column.

### Spark and Hive
* Parquet is a **file format** not an independent database server.
* Spark can work with the [Hive](https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started) relational database system that supports the full array of database operations.
* Hive is compatible with HDFS.

In [23]:
parquet_file="/FileStore/tables/users.parquet"

In [24]:
#load a Parquet file
print(parquet_file)
df = spark.read.load(parquet_file)
df.show()

In [25]:
df2=df.select("name", "favorite_color")
df2.show()

In [26]:
outfilename="namesAndFavColors.parquet"
df2.write.save("/"+outfilename)

A new interface object has been added in **Spark 2.0** called **SparkSession**. A spark session is initialized using a `builder`. For example
```python
spark = SparkSession.builder \
         .master("local") \
         .appName("Word Count") \
         .config("spark.some.config.option", "some-value") \
         .getOrCreate()
```

Using a SparkSession a Parquet file is read [as follows:](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.parquet):
```python
df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
```

## Summary
* Dataframes are an efficient way to store data tables.
* All of the values in a column have the same type.
* A good way to store a dataframe in disk is to use a Parquet file.
* Next: Operations on dataframes.