# Project 2
## Hanan Sukenik

### Loading the Data into Kafka

We will begin our work by using a Docker Compose .yml file to configure and start multiple Docker containers needed for our project. In this case, we used a Docker Compose file containing Zookeeper, Kafka, Cloudera (HDFS), Spark and the midsw250 base container.
We will examine the Docker-Compose file, spin up the cluster and examine the kafka logs. Finally, we will make sure all of the containers are up.

```bash
cp ../course-content/08-Querying-Data/docker-compose.yml .
vim docker-compose.yml 
docker-compose up -d
docker-compose logs -f kafka
docker-compose ps
```

Next, we will get our data file and examine it, first by using "cat" and then by using jq to examine the json file parsed into seperate lines:
```bash
curl -L -o assessment-attempts-20180128-121051-nested.json https://goo.gl/ME6hjp
cat assessment-attempts-20180128-121051-nested.json
docker-compose exec mids bash -c "cat /w205/project-2-hanansuk/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c"
```

Next, we will create a new topic on Kafka, that represents the messages we are about to send to the topic. In this case, the topic is "assessments", as the messages we represent different assessments. We will also make sure the topic was created successfuly, and then publish the parsed json file using the Kafka Console Producer.
Finally, we will (again) make sure the containers are still running.
```bash
docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
docker-compose exec kafka kafka-topics --describe --topic assessments --zookeeper zookeeper:32181
docker-compose exec mids bash -c "cat /w205/project-2-hanansuk/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments && echo 'Produced assessments.'"
docker-compose ps
``` 

### From Kafka to a Spark Dataframe

Now, we will run Hadoop and Spark using the Hadoop & Spark containers:
```bash
docker-compose exec cloudera hadoop fs -ls /tmp/
docker-compose exec spark pyspark
```

And read the messages from Kafka:
```python
assessments= spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:29092") \
  .option("subscribe","assessments") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load() 
```
Next, we will examine the Schema of the data and take an initial looks at it:
```python
assessments.printSchema()
assessments.show()
```
It seems like the only column we need here is "value" one- so we will cast it as a string and extract it to a seperate variable, while examining the schema and the data again, and caching it to avoid warning messages later on:
```python
assessments_as_strings = assessments.selectExpr("CAST(value AS STRING)")
assessments_as_strings.show()
assessments_as_strings.printSchema()
assessments_as_strings.cache()
```
Now that the data is properly read into Spark, it's time to begin unwrapping it, using the json module, to examine the first message:
```python
import json
first_assessment = json.loads(assessments_as_strings.select('value').take(1)[0].value)
first_assessment
first_assessment.keys
```

It seems like the assessments are rather messy, as they contain nested jsons for the different questions of each assessment. Therefore, we will use json.loads to pretty print the assessment and really look into it:
```python
print(json.dumps(first_assessment, indent=4, sort_keys=True))
```

Now we will want to map the messages onto an RDD(Spark's Distributed Dataset), apply a map to it and convert it back to a Spark dataframe:
```python
from pyspark.sql import Row
assess_df = assessments_as_strings.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()
assess_df.show()
assess_df.printSchema()
```

### Landing the Data into Storage (Hadoop) and Querying

Our next step wil be landing the assessments dataframe into storage (Hadoop) to make it querieable in an efficient way. We will use a Parquet format for that. We will also create a Spark TempTable (View) for analysis:
```python
assess_df.write.parquet("/tmp/assess_df")
assess_df.registerTempTable('assessments_new')
```

Now we can start using SparkSQL to query the data and answer some questions. First, we will examine the data and see what our main challenges are.
As a start, let's look at the dataframe as a whole, and then look into the "certification" and "max_attemps" fields:
```sql
spark.sql("select * from assessments_new limit 10").show()
spark.sql("SELECT certification, count(*) from assessments_new GROUP BY certification").show()
spark.sql("SELECT max_attempts, count(*) from assessments_new GROUP BY max_attempts").show()
```

##### We can already notice (at least) 3 issues with the dataset:
* The main challenge will be the nested arrays in the "sequences" field. This is due to the multiple questions in each sequence.
* There are null values in several of our fields
* Some of the exam_name data is aligned to the right and some is aligned to the left

In order to really prepare the dataset for anlysis, we would want to unwrap the nested arrays while choosing only specific fields we are interested in from them. We will not do this right now for this project.

##### Let's begin looking into some questions with SQL and store some of them into Hadoop:

1. How many assessments are in the dataset?
```sql
spark.sql("select COUNT(*) FROM assessments_new").show()
```


2. How many people took Learning Git?
```sql
spark.sql("select COUNT(*) FROM assessments_new WHERE exam_name LIKE '%Learning%Git%'").show()
```


3. What is the least common course taken? And the most common? (We will store this view)
```sql
spark.sql("SELECT exam_name, COUNT(*) as exam_count FROM assessments_new GROUP BY exam_name ORDER BY exam_count DESC").show(3000)
course_count = spark.sql("SELECT exam_name, COUNT(*) as exam_count FROM assessments_new GROUP BY exam_name ORDER BY exam_count DESC")
course_count.write.parquet("/tmp/course_count")
``` 


4. How many certifications were successful? How many weren't?
```sql
spark.sql("SELECT certification, count(*) from assessments_new GROUP BY certification").show()
```


5. What was the number instances for each number of maximum attempts?
```sql
spark.sql("SELECT max_attempts, count(*) from assessments_new GROUP BY max_attempts").show()
```


6. What is the earliest date and latest date for the assessments in our dataset? (We will store this view)
```sql
spark.sql("SELECT DATE(MAX(started_at)), DATE(MIN(started_at)) from assessments_new").show()
assess_dates = spark.sql("SELECT DATE(MAX(started_at)), DATE(MIN(started_at)) from assessments_new")
assess_dates.write.parquet("/tmp/assess_dates")
``` 

We are done with our analysis and data wrangling and can exit Spark:
```python
exit()
```


### Final Steps

The last phase will be looking into the results we have saves into Hadoop:
```bash
docker-compose exec cloudera hadoop fs -ls /tmp/
docker-compose exec cloudera hadoop fs -ls /tmp/course_count
docker-compose exec cloudera hadoop fs -ls /tmp/assess_dates
```

Now the data is ready for data scientists, who work for our customers, to run queries on the data, and examine some of the questions we have already looked into.
To wrap things up, we will take our Docker-Compose down:
```bash
docker-compose down
```