### CLOUDERA CCA175 - PRACTISE TEST

### Question 1: Load practise dataset and manually define schema
The dataset is stored here: /databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv

Read it is as a csv, but include option("inferSchema","false") and manually define the schema. The Schema should have the form:

* index: integer (nullable = true)
* carat: double (nullable = true)
* cut: string (nullable = true)
* color: string (nullable = true)
* clarity: string (nullable = true)
* depth: double (nullable = true)
* table: double (nullable = true)
* price: integer (nullable = true)
* x: double (nullable = true)
* y: double (nullable = true)
* z: double (nullable = true)

In [3]:
from pyspark.sql.types import *
schema = StructType([StructField("index",IntegerType(),True),\
                    StructField("carat",DoubleType(),True),\
                    StructField("cut",StringType(),True),\
                    StructField("color",StringType(),True),\
                    StructField("clarity",StringType(),True),\
                    StructField("depth",DoubleType(),True),\
                    StructField("table",DoubleType(),True),\
                    StructField("price",IntegerType(),True),
                    StructField("x",DoubleType(),True),\
                    StructField("y",DoubleType(),True),\
                    StructField("z",DoubleType(),True)])
diamonds = spark.read.option("header", "true").option("inferSchema", "false").format("csv").load("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", schema = schema)
diamonds.printSchema()
diamonds.show(5)

### Question 1 - Count how many diamonds are of each colour

In [5]:
from pyspark.sql import functions as F

diamonds.groupBy(['color']).count().show(5)

#We can also county by multiple properties, using the following format:
diamonds.groupBy(['cut','color']).count().show(5)

### Question 2 -  Make a new row which consists of the first letter of 'cut' followed by the 'color', with a space in between the two
So for example if Cut = Ideal and Color = E, there should be a new column called 'CutColor' with 'I  E'.

Also, replace the Cut and Color columns by the CutColor column.

In [7]:
diamonds.withColumn("CutColor", F.concat(F.col('cut').substr(1,1), F.lit(' '), F.col('color'))).select('index','carat','CutColor','clarity','depth','table','price','x','y','z').show(5)

### Question 3 - Create two tables and then re-join them
Define D1 to be a table with just the index and carat. Define D2 to be just the index and the price. Then rejoin the two of them to get a table with index, carat, and price.

In [9]:
D1 = diamonds.select("index","carat")
D2 = diamonds.select(F.col('index').alias('index2'),"price")

D1.join(D2,D1.index==D2.index2).select('index','carat','price').show(5)

### Question 4 - Multiply the dimensions to calculate volume, and then sort the entries by descending volume (with 2 decimal places).
Tips: Look into the 'round' function, which can be used inside DF.select.

In [11]:
DF = diamonds.withColumn('volume',(diamonds['x']*diamonds['y']*diamonds['z']))
DF = DF.sort(F.desc('volume')).select("index","carat","cut","color","clarity","depth","table","price",F.round(DF["volume"],2).alias('volume'))
DF.show(5)

### Question 5 - Read in the Table as an RDD, Remove the Header, and Turn it into a DF
You must manually define a schema using StructType, which is read in when creating the DF

In [13]:
from pyspark.sql.types import *

RDD = sc.textFile("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv")
header = RDD.first()
RDD = RDD.filter(lambda line: line != header)

RDD1 = RDD.map(lambda l:l.split(","))
RDD2 = RDD1.map(lambda values:[int(values[0][1:-1]),float(values[1]),str(values[2][1:-1])])
schema = StructType([StructField('index',IntegerType(),True),StructField('carat',FloatType(),True),StructField('Grade',StringType(),True)])
df = spark.createDataFrame(RDD2,schema=schema)
df.show(5)

### Question 6 - Reading in the Table as an DF, Output the Average Price Per Cut

In [15]:
diamonds.select(F.col('cut'),F.col('price')).groupBy('cut').agg(F.mean('price')).show(5)

### Question 7 - Reading in the Table as an RDD, Output the Average Price Per Cut

In [17]:
RDD = sc.textFile("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv")
header = RDD.first()
RDD = RDD.filter(lambda x: x != header)
RDD = RDD.map(lambda x: x.split(',')).map(lambda values: [str(values[2][1:-1]),int(values[7])])
RDD = RDD.map(lambda x: (x[0],(x[1],1)))
RDD = RDD.reduceByKey(lambda v1,v2: (v1[0]+v2[0],v1[1]+v2[1]))
RDD = RDD.map(lambda x: (x[0],(x[1][0]/x[1][1])))
for line in RDD.collect():
  print(line)

schema = StructType([StructField('cut',StringType(),True),StructField('price',DoubleType(),True)])
DF = spark.createDataFrame(RDD,schema=schema)
DF.select(F.col("cut"),F.round(DF["price"],2).alias('price')).show(5)

### Really Important Additional Stuff!

#### The first two questions require you to execute 1 sqoop import and 1 sqoop export. Memorize these commands more or less exactly. But note that the format may change slightly (for example the --where condition may be slightly different). Be sure the directory is correct too.

In [20]:
sqoop import --connect jdbc:mysql://gateway/problem1 --username cloudera --password cloudera --table customers --where "city='CA'" --fields-terminated-by"," --as-textFile --verbose
sqoop sqoop export --table solution --connect jdbc:mysql://gateway/problem2 --username cloudera --password cloudera --export-dir '/user/cert/problem2/data/customer' --update-mode allowinsert --verbose

#### The remaining 7 quesitons all require you to read in some data, manipulate it, and save it back to a directory. Be extrememly careful regarding the file type you write to. You may have to write to csv, json, parquet, or a text file. In general, you just include .format('json') in the save function, but be careful as to save to a text file from a DataFrame needs an additional step, so practise first. Also, when you read in the data frame, you may need to change the delimiter to something else (for example, to change to a tab you have to add .option('delimiter','\t')). You may also be asked to save using snappy compression, in which case you have to add .option('compression','snappy').

In [22]:
#Example read function:
DF = spark.read.option("header", "false").option("inferSchema", "true").format("csv").option('delimiter','\t').load("problem3/data")
#Assuming you have predefined a schema (as in question 1 at the top):
schema = some stuff
DF = spark.read.option("header", "false").option("inferSchema", "false").format("csv").option('delimiter','\t').load("problem3/data", schema = schema)

#Example write function:
DF.write.format('parquet').option('compression','snappy').save("problem3/soultion")
#Assuming you need to write an RDD to text:
RDD.saveAsTextFile('problem3/solution')
#Assuming you need to write a DF to text:
I still need to figure this out. I got it wrong on the last attempt :/