In [1]:
# Load a dataframe from a CSV file (with header line).  Change the filename to one that matches your S3 bucket.
eventDF = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferSchema='true').load('/mnt/S3/data/dataSetEvents.csv')
eventDF = eventDF.dropDuplicates()

In [2]:
# Examine the data, and field/column names
display(eventDF)

In [3]:
# Examine the schema (to see the inferred types)
eventDF.printSchema()

In [4]:
# Show info for a single field/column
eventDF.describe('year').show()

In [5]:
# Count for each value in the make field/column
eventDF.groupBy('make').count().orderBy('make')

In [6]:
display(eventDF.groupBy('make').count().orderBy('make'))

In [7]:
# Let's get the possible makes using distinct()
eventDF.select('make').distinct().orderBy('make')

In [8]:
 display(eventDF.select('make').distinct().orderBy('make'))

In [9]:
# If we collect up all the values from the DataFrame, what do we get?
eventDF.select('make').distinct().orderBy('make').collect()

In [10]:
# How amny unique VINs are referenced in the data?
eventDF.select('vin').distinct().count()

In [11]:
# We can select multiple fields, and group them, just like in SQL
makeModelDF = eventDF.select('make', 'model').groupBy('make', 'model').count().orderBy('model', 'make')

In [12]:
makeModelDF.printSchema()

In [13]:
makeModelDF.take(5)

In [14]:
# Grab all the VINs with non-zero price.
vinPriceDF = eventDF.select('vin', 'price').filter(eventDF.price > 0)

In [15]:
display(vinPriceDF)

In [16]:
# DataFrame API has crosstab()
display(eventDF.crosstab('make', 'model'))

In [17]:
# We can convert a DataFrame to an RDD
makeModelRDD = eventDF.rdd.map(lambda row: (row['make'], row['model']))

In [18]:
makeModelRDD.take(5)

In [19]:
# We can convert a Row object to a dictionary
makeModelAsDictRDD = eventDF.rdd.map(lambda row: row.asDict())

In [20]:
makeModelAsDictRDD.take(5)

In [21]:
eventDF.registerTempTable("table1")

In [22]:
#q1
eventDF1 = eventDF.select('vin', 'price', 'make', 'model').distinct()
eventDF1 = eventDF1.filter(eventDF1.price > 0)
eventDF1.registerTempTable("table1")

pricequeryDF = sqlContext.sql("SELECT make, model, MIN(price) AS MinPrice, MAX(price) AS MaxPrice, AVG(price) AS AvgPrice FROM table1 GROUP BY make, model")
display(pricequeryDF)

#pricequeryDF.repartition(1).write.format('com.databricks.spark.csv').options(header='true').save('/mnt/S3/output/Spark3price')

In [23]:
#q2
eventDF2 = eventDF.select('vin', 'mileage', 'year').distinct()
eventDF2 = eventDF2.filter(eventDF2.mileage > 0)
eventDF2.registerTempTable("table2")

milesqueryDF = sqlContext.sql("SELECT year, MIN(mileage) AS MinMiles, MAX(mileage) AS MaxMiles, AVG(mileage) AS AvgMiles FROM table2 GROUP BY year ORDER BY AvgMiles desc")
display(milesqueryDF)

In [24]:
#q3
eventqueryDF = eventDF.crosstab('vin','event')
display(eventqueryDF)

In [25]:
pricequeryDF.repartition(1).write.format('com.databricks.spark.csv').options(header='true').save('/mnt/S3/output/SparkHW3price')
milesqueryDF.repartition(1).write.format('com.databricks.spark.csv').options(header='true').save('/mnt/S3/output/SparkHW3miles')
eventqueryDF.repartition(1).write.format('com.databricks.spark.csv').options(header='true').save('/mnt/S3/output/SparkHW3event')
