## Why Data Lakes: Evolution of the Data Warehouse

### Evolution of the Data Warehouse
Q: Is there anything wrong with the data warehouse that we need something different?

No, data warehousing is a rather **mature field** with lots of cumulative experience over the years, **tried-and-true technologies**. **Dimensional modeling is still extremely relevant** to this day.

**For many organizations, a data warehouse is still the best way to go**, perhaps, the biggest change would be going from an on-premise deployment to a cloud deployment. 

Q: So, why do we need a data lake?

In recent years, many factors drove the evolution of the data warehouse, to name a few:
* The abundance of unstructured data (text, xml, json, logs, sensor data, images, voice, etc..)
* Unprecedented data volumes (social, IOT, machine-generated, etc..)
* The rise of Big Data technologies like HDFS, Spark, etc..
* New types of data analysis gaining momentum, e.g. predictive analytics, recommender systems, graph analytics, etc..
* Emergence of new roles like the data scientist

## Why Data Lakes: Unstructured & Big Data

### Abundance of Unstructured Data
Q: Can we have unstructured data in the data warehouse?

* Might be possible in the ETL process. FOr instance we might be able to distill sine elements from json data and put it in a tabular format.
* But later, we might decide we want to transform it differently, so deciding on a particular form of transformation is a **strong commitment without enough knowledge**. E.g we start by recording # of replicas in a facebook of comments and then we interested in the frequency of angry words.
* Some data is hard to put in a tabular format like **deep json structures**.
* Some data like text/pdf documents could be stored as "**blobs**" of data in a relational database but totally **useless useless processed to extract metrics** .
* The Hadoop file system (HDFS) made it possible to Peta Bytes of data on commodity hardware. **Much lower cost per TB** compared to MPP(Massively parallel processing) databases. 
* Associated processing tools starting from MapReduce, Pig, Hive, Impala, and Spark, to name a few, made it possible to **process this data at scale on the same hardware used for storage**.
* It is possible to make data analysis without inserting into a predefined schema. One can load a CSV file and make a query without creating a table, inserting the data in the table. Similarly one can process unstructured text. This approach is know as "**Schema-On-Read**"

## Why Data Lakes: New Roles & Advanced Analytics
* The data warehouse by design follows a **very well-architured** path yo make a **clean, consistent and performant model** that business users can easily use to gain insights and make decisions.
* As data became an asset of highest value (**Data is the new oil**), a role like the **data scientist** started to emerge seeking value from data
* The data scientist job is almost impossible conforming to a **single rigid representation of data**. He needs freedom to represent data, join data sets together, retrieve new external data sources and more.
* The type of analytics such as , e.g. **machine learning, natural language processing** need to access the raw data in forms totally different from a star schema.

### The Data Lake is the new Data Warehouse
* The data lake shares the goals of the data warehouse of supporting business insights beyond the day-today transactional data handling.
* The Data lake is new form of data warehouse that evolved to cope with:
    * The **variety of data formats** and structuring
    * The agile and ad-hoc nature of **data exploration** activities needed by new roles like **data scientist**
    * THe wide data spectrum data transformation needed by **advanced analytics** like machine learning, graph analytics, and recommender systems

## Big Data Effects: Low Costs, ETL Offloading
<img src="images/big_data_effects.png">

## Big Data Effects: Schema-on-Read
* Traditionally, data in a database has been much easier to process than data in plain files
* Big Data tools in the hadoop ecosystem e.g. Hive & SPark made it easy to work with a file as easy as it is to work with a database without:
    * ~Creating a database~
    * ~ Inserting the data into database~
* **Schema on-read**: as for the schema of a table (simple file on disk):
    * It is either inferred
    * Or specified and the data is not inserted into it, but upon read the data is checked against the specified schema
    
<img src="images/schema_on_read_1.png">
<img src="images/schema_on_read_2.png">
<img src="images/schema_on_read_3.png">
<img src="images/schema_on_read_4.png">

## Big Data Effects: (Un-/Semi-)Structured support
* Spark has the ability to read/write files in:
    * Text-based many formats, csv, josn, text
    * Binary formats such as Avro (saves space) and Parquet (columnar)
    * Compressed formats e.g. gzip & snappy
    
```python
dfLog = spark.read.text("data/NASA_access_log_Jul95.gz")

dfRaw = spark.read.csv("data/news_worldnews.csv")
```

* Read/write from a variety of file systems
    * Local file system
    * HDFS
    * S3
  
  
```python
df = spark.read.csv("s3a://udacity-labs/sports/sport_league.csv")
```

* Read/write from a variety of databases
    * SQL through JDBC
    * NoSQL: MongoDb, Cassandra, Neo4j, ...

```python
pgDF = spark.read.format("jdbc")\
        .option("driver", "org.postgresql.Driver")\
        .option("url", "jdbc:postgresql://localhost")\
        .option("dbtable", "public.pagila")\
        .option("user", "postgresd").option("password","postgres").load()
```

* All exposed in a single abstraction, the dataframe , could be processed with SQL

## Exercise 1: Schema on Read
The data used in this exercise is from http://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz

## Importing the libraries

In [15]:
import os
from urllib.request import  urlretrieve
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings

warnings.filterwarnings('ignore')
%matplotlib inline

### Downloading the data

In [2]:
if not os.path.exists("data"):
    os.makedirs("data")
    urlretrieve("ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz",filename="data/NASA_access_log_Jul95.gz")

### Creating spark session

In [3]:
spark = SparkSession.builder.getOrCreate()

### Loading the dataset

In [4]:
dfLog = spark.read.text('data/NASA_access_log_Jul95.gz')

### Quick inspection of  the data set

In [5]:
# see the schema
dfLog.printSchema()

root
 |-- value: string (nullable = true)



In [6]:
# number of lines
dfLog.count()

1891715

In [7]:
#what's in there? 
dfLog.show(5)

+--------------------+
|               value|
+--------------------+
|199.72.81.55 - - ...|
|unicomp6.unicomp....|
|199.120.110.21 - ...|
|burger.letters.co...|
|199.120.110.21 - ...|
+--------------------+
only showing top 5 rows



In [8]:
#a better show?
dfLog.show(5, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                 |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                      |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085   |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0               |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179|
+-----------------------

In [9]:
#pandas to the rescue
pd.set_option('max_colwidth', 200)
dfLog.limit(5).toPandas()

Unnamed: 0,value
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985"
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085"
3,"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0"
4,"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179"


### Let' try simple parsing with split

In [10]:
dfArrays = dfLog.withColumn("tokenized", fn.split("value"," "))
dfArrays.limit(10).toPandas()

Unnamed: 0,value,tokenized
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245","[199.72.81.55, -, -, [01/Jul/1995:00:00:01, -0400], ""GET, /history/apollo/, HTTP/1.0"", 200, 6245]"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","[unicomp6.unicomp.net, -, -, [01/Jul/1995:00:00:06, -0400], ""GET, /shuttle/countdown/, HTTP/1.0"", 200, 3985]"
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085","[199.120.110.21, -, -, [01/Jul/1995:00:00:09, -0400], ""GET, /shuttle/missions/sts-73/mission-sts-73.html, HTTP/1.0"", 200, 4085]"
3,"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0","[burger.letters.com, -, -, [01/Jul/1995:00:00:11, -0400], ""GET, /shuttle/countdown/liftoff.html, HTTP/1.0"", 304, 0]"
4,"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179","[199.120.110.21, -, -, [01/Jul/1995:00:00:11, -0400], ""GET, /shuttle/missions/sts-73/sts-73-patch-small.gif, HTTP/1.0"", 200, 4179]"
5,"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /images/NASA-logosmall.gif HTTP/1.0"" 304 0","[burger.letters.com, -, -, [01/Jul/1995:00:00:12, -0400], ""GET, /images/NASA-logosmall.gif, HTTP/1.0"", 304, 0]"
6,"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/video/livevideo.gif HTTP/1.0"" 200 0","[burger.letters.com, -, -, [01/Jul/1995:00:00:12, -0400], ""GET, /shuttle/countdown/video/livevideo.gif, HTTP/1.0"", 200, 0]"
7,"205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/countdown.html HTTP/1.0"" 200 3985","[205.212.115.106, -, -, [01/Jul/1995:00:00:12, -0400], ""GET, /shuttle/countdown/countdown.html, HTTP/1.0"", 200, 3985]"
8,"d104.aa.net - - [01/Jul/1995:00:00:13 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","[d104.aa.net, -, -, [01/Jul/1995:00:00:13, -0400], ""GET, /shuttle/countdown/, HTTP/1.0"", 200, 3985]"
9,"129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] ""GET / HTTP/1.0"" 200 7074","[129.94.144.152, -, -, [01/Jul/1995:00:00:13, -0400], ""GET, /, HTTP/1.0"", 200, 7074]"


### Second attempt, let's build a custom parsing UDF

In [11]:
@fn.udf
def parseUDF(line):
    import re
    PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
    match = re.search(PATTERN, line)
    if match is None:
        return (line, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = 0
    else:
        size = match.group(9)
    return {
        "host"          : match.group(1), 
        "client_identd" : match.group(2), 
        "user_id"       : match.group(3), 
        "date_time"     : match.group(4), 
        "method"        : match.group(5),
        "endpoint"      : match.group(6),
        "protocol"      : match.group(7),
        "response_code" : int(match.group(8)),
        "content_size"  : size
    }

In [12]:
#Let's start from the beginning
dfParsed= dfLog.withColumn("parsed", parseUDF("value"))
dfParsed.limit(10).toPandas()

Unnamed: 0,value,parsed
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245","{response_code=200, protocol=HTTP/1.0, endpoint=/history/apollo/, content_size=6245, method=GET, date_time=01/Jul/1995:00:00:01 -0400, user_id=-, host=199.72.81.55, client_identd=-}"
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","{response_code=200, protocol=HTTP/1.0, endpoint=/shuttle/countdown/, content_size=3985, method=GET, date_time=01/Jul/1995:00:00:06 -0400, user_id=-, host=unicomp6.unicomp.net, client_identd=-}"
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085","{response_code=200, protocol=HTTP/1.0, endpoint=/shuttle/missions/sts-73/mission-sts-73.html, content_size=4085, method=GET, date_time=01/Jul/1995:00:00:09 -0400, user_id=-, host=199.120.110.21, c..."
3,"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0","{response_code=304, protocol=HTTP/1.0, endpoint=/shuttle/countdown/liftoff.html, content_size=0, method=GET, date_time=01/Jul/1995:00:00:11 -0400, user_id=-, host=burger.letters.com, client_identd=-}"
4,"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179","{response_code=200, protocol=HTTP/1.0, endpoint=/shuttle/missions/sts-73/sts-73-patch-small.gif, content_size=4179, method=GET, date_time=01/Jul/1995:00:00:11 -0400, user_id=-, host=199.120.110.21..."
5,"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /images/NASA-logosmall.gif HTTP/1.0"" 304 0","{response_code=304, protocol=HTTP/1.0, endpoint=/images/NASA-logosmall.gif, content_size=0, method=GET, date_time=01/Jul/1995:00:00:12 -0400, user_id=-, host=burger.letters.com, client_identd=-}"
6,"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/video/livevideo.gif HTTP/1.0"" 200 0","{response_code=200, protocol=HTTP/1.0, endpoint=/shuttle/countdown/video/livevideo.gif, content_size=0, method=GET, date_time=01/Jul/1995:00:00:12 -0400, user_id=-, host=burger.letters.com, client..."
7,"205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/countdown.html HTTP/1.0"" 200 3985","{response_code=200, protocol=HTTP/1.0, endpoint=/shuttle/countdown/countdown.html, content_size=3985, method=GET, date_time=01/Jul/1995:00:00:12 -0400, user_id=-, host=205.212.115.106, client_iden..."
8,"d104.aa.net - - [01/Jul/1995:00:00:13 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","{response_code=200, protocol=HTTP/1.0, endpoint=/shuttle/countdown/, content_size=3985, method=GET, date_time=01/Jul/1995:00:00:13 -0400, user_id=-, host=d104.aa.net, client_identd=-}"
9,"129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] ""GET / HTTP/1.0"" 200 7074","{response_code=200, protocol=HTTP/1.0, endpoint=/, content_size=7074, method=GET, date_time=01/Jul/1995:00:00:13 -0400, user_id=-, host=129.94.144.152, client_identd=-}"


In [13]:
dfParsed.printSchema()

root
 |-- value: string (nullable = true)
 |-- parsed: string (nullable = true)



### Third attempt, let's fix our UDF

In [14]:
#from pyspark.sql.functions import udf # already imported
from pyspark.sql.types import MapType, StringType

@fn.udf(MapType(StringType(),StringType()))
def parseUDFbetter(line):
    import re
    PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
    match = re.search(PATTERN, line)
    if match is None:
        return (line, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = 0
    else:
        size = match.group(9)
    return {
        "host"          : match.group(1), 
        "client_identd" : match.group(2), 
        "user_id"       : match.group(3), 
        "date_time"     : match.group(4), 
        "method"        : match.group(5),
        "endpoint"      : match.group(6),
        "protocol"      : match.group(7),
        "response_code" : int(match.group(8)),
        "content_size"  : size
    }

In [15]:
#Let's start from the beginning
dfParsed= dfLog.withColumn("parsed", parseUDFbetter("value"))
dfParsed.limit(10).toPandas()

Unnamed: 0,value,parsed
0,"199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ""GET /history/apollo/ HTTP/1.0"" 200 6245","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/history/apollo/', 'content_size': '6245', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:01 -0400', 'user_id': '-', 'host': '199.72..."
1,"unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:06 -0400', 'user_id': '-', 'host': 'uni..."
2,"199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] ""GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0"" 200 4085","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/missions/sts-73/mission-sts-73.html', 'content_size': '4085', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:09 -0400', 'us..."
3,"burger.letters.com - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/countdown/liftoff.html HTTP/1.0"" 304 0","{'response_code': '304', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/liftoff.html', 'content_size': '0', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:11 -0400', 'user_id': '-', 'ho..."
4,"199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] ""GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0"" 200 4179","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/missions/sts-73/sts-73-patch-small.gif', 'content_size': '4179', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:11 -0400', ..."
5,"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /images/NASA-logosmall.gif HTTP/1.0"" 304 0","{'response_code': '304', 'protocol': 'HTTP/1.0', 'endpoint': '/images/NASA-logosmall.gif', 'content_size': '0', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:12 -0400', 'user_id': '-', 'host': ..."
6,"burger.letters.com - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/video/livevideo.gif HTTP/1.0"" 200 0","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/video/livevideo.gif', 'content_size': '0', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:12 -0400', 'user_id': '..."
7,"205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] ""GET /shuttle/countdown/countdown.html HTTP/1.0"" 200 3985","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/countdown.html', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:12 -0400', 'user_id': '-'..."
8,"d104.aa.net - - [01/Jul/1995:00:00:13 -0400] ""GET /shuttle/countdown/ HTTP/1.0"" 200 3985","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:13 -0400', 'user_id': '-', 'host': 'd10..."
9,"129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] ""GET / HTTP/1.0"" 200 7074","{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/', 'content_size': '7074', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:13 -0400', 'user_id': '-', 'host': '129.94.144.152', 'cli..."


In [16]:
#Bingo!! we'got a column of type map with the fields parsed
dfParsed.printSchema()

root
 |-- value: string (nullable = true)
 |-- parsed: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [17]:
dfParsed.select("parsed").limit(10).toPandas()

Unnamed: 0,parsed
0,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/history/apollo/', 'content_size': '6245', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:01 -0400', 'user_id': '-', 'host': '199.72..."
1,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:06 -0400', 'user_id': '-', 'host': 'uni..."
2,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/missions/sts-73/mission-sts-73.html', 'content_size': '4085', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:09 -0400', 'us..."
3,"{'response_code': '304', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/liftoff.html', 'content_size': '0', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:11 -0400', 'user_id': '-', 'ho..."
4,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/missions/sts-73/sts-73-patch-small.gif', 'content_size': '4179', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:11 -0400', ..."
5,"{'response_code': '304', 'protocol': 'HTTP/1.0', 'endpoint': '/images/NASA-logosmall.gif', 'content_size': '0', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:12 -0400', 'user_id': '-', 'host': ..."
6,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/video/livevideo.gif', 'content_size': '0', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:12 -0400', 'user_id': '..."
7,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/countdown.html', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:12 -0400', 'user_id': '-'..."
8,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/shuttle/countdown/', 'content_size': '3985', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:13 -0400', 'user_id': '-', 'host': 'd10..."
9,"{'response_code': '200', 'protocol': 'HTTP/1.0', 'endpoint': '/', 'content_size': '7074', 'method': 'GET', 'date_time': '01/Jul/1995:00:00:13 -0400', 'user_id': '-', 'host': '129.94.144.152', 'cli..."


### Let's build separate columns

In [18]:
dfParsed.selectExpr("parsed['host'] as host").limit(5).show(5)

+--------------------+
|                host|
+--------------------+
|        199.72.81.55|
|unicomp6.unicomp.net|
|      199.120.110.21|
|  burger.letters.com|
|      199.120.110.21|
+--------------------+



In [19]:
dfParsed.selectExpr(["parsed['host']", "parsed['date_time']"]).show(5)

+--------------------+--------------------+
|        parsed[host]|   parsed[date_time]|
+--------------------+--------------------+
|        199.72.81.55|01/Jul/1995:00:00...|
|unicomp6.unicomp.net|01/Jul/1995:00:00...|
|      199.120.110.21|01/Jul/1995:00:00...|
|  burger.letters.com|01/Jul/1995:00:00...|
|      199.120.110.21|01/Jul/1995:00:00...|
+--------------------+--------------------+
only showing top 5 rows



In [20]:
fields = ["host", "client_identd","user_id", "date_time", "method", "endpoint", "protocol", "response_code", "content_size"]
exprs = [ "parsed['{}'] as {}".format(field,field) for field in fields]
exprs


["parsed['host'] as host",
 "parsed['client_identd'] as client_identd",
 "parsed['user_id'] as user_id",
 "parsed['date_time'] as date_time",
 "parsed['method'] as method",
 "parsed['endpoint'] as endpoint",
 "parsed['protocol'] as protocol",
 "parsed['response_code'] as response_code",
 "parsed['content_size'] as content_size"]

In [21]:
dfClean = dfParsed.selectExpr(*exprs)
dfClean.limit(5).toPandas()

Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size
0,199.72.81.55,-,-,01/Jul/1995:00:00:01 -0400,GET,/history/apollo/,HTTP/1.0,200,6245
1,unicomp6.unicomp.net,-,-,01/Jul/1995:00:00:06 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985
2,199.120.110.21,-,-,01/Jul/1995:00:00:09 -0400,GET,/shuttle/missions/sts-73/mission-sts-73.html,HTTP/1.0,200,4085
3,burger.letters.com,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/countdown/liftoff.html,HTTP/1.0,304,0
4,199.120.110.21,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,HTTP/1.0,200,4179


### Popular hosts

In [22]:
dfClean.groupBy("host").count().orderBy(fn.desc("count")).limit(10).toPandas()

Unnamed: 0,host,count
0,piweba3y.prodigy.com,17572
1,piweba4y.prodigy.com,11591
2,piweba1y.prodigy.com,9868
3,alyssa.prodigy.com,7852
4,siltb10.orl.mmc.com,7573
5,piweba2y.prodigy.com,5922
6,edams.ksc.nasa.gov,5434
7,163.206.89.4,4906
8,news.ti.com,4863
9,disarray.demon.co.uk,4353


### Popular content

In [23]:
dfClean.groupBy("endpoint").count().orderBy(fn.desc("count")).limit(10).toPandas()

Unnamed: 0,endpoint,count
0,/images/NASA-logosmall.gif,111330
1,/images/KSC-logosmall.gif,89638
2,/images/MOSAIC-logosmall.gif,60467
3,/images/USA-logosmall.gif,60013
4,/images/WORLD-logosmall.gif,59488
5,/images/ksclogo-medium.gif,58801
6,/images/launch-logo.gif,40871
7,/shuttle/countdown/,40278
8,/ksc.html,40226
9,/images/ksclogosmall.gif,33585


### Large Files

In [24]:
dfClean.createOrReplaceTempView("cleanlog")
spark.sql("""
select endpoint, content_size
from cleanlog 
order by content_size desc
""").limit(10).toPandas()

Unnamed: 0,endpoint,content_size
0,/images/cdrom-1-95/img0007.jpg,99981
1,/shuttle/missions/sts-71/movies/sts-71-launch.mpg,999424
2,/shuttle/missions/sts-71/movies/sts-71-launch.mpg,999424
3,/history/apollo/apollo-13/images/index.gif,99942
4,/history/apollo/apollo-13/images/index.gif,99942
5,/history/apollo/apollo-13/images/index.gif,99942
6,/history/apollo/apollo-13/images/index.gif,99942
7,/history/apollo/apollo-13/images/index.gif,99942
8,/history/apollo/apollo-13/images/index.gif,99942
9,/history/apollo/apollo-13/images/index.gif,99942


In [25]:
dfCleanTyped = dfClean.withColumn("content_size_bytes", fn.expr("cast(content_size  as int)"))
dfCleanTyped.limit(5).toPandas()

Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size,content_size_bytes
0,199.72.81.55,-,-,01/Jul/1995:00:00:01 -0400,GET,/history/apollo/,HTTP/1.0,200,6245,6245
1,unicomp6.unicomp.net,-,-,01/Jul/1995:00:00:06 -0400,GET,/shuttle/countdown/,HTTP/1.0,200,3985,3985
2,199.120.110.21,-,-,01/Jul/1995:00:00:09 -0400,GET,/shuttle/missions/sts-73/mission-sts-73.html,HTTP/1.0,200,4085,4085
3,burger.letters.com,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/countdown/liftoff.html,HTTP/1.0,304,0,0
4,199.120.110.21,-,-,01/Jul/1995:00:00:11 -0400,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,HTTP/1.0,200,4179,4179


In [26]:
dfCleanTyped.createOrReplaceTempView("cleantypedlog")
spark.sql("""
select endpoint, content_size
from cleantypedlog 
order by content_size_bytes desc
""").limit(10).toPandas()

Unnamed: 0,endpoint,content_size
0,/shuttle/countdown/video/livevideo.jpeg,6823936
1,/statistics/1995/bkup/Mar95_full.html,3155499
2,/statistics/1995/bkup/Mar95_full.html,3155499
3,/statistics/1995/bkup/Mar95_full.html,3155499
4,/statistics/1995/bkup/Mar95_full.html,3155499
5,/statistics/1995/bkup/Mar95_full.html,3155499
6,/statistics/1995/bkup/Mar95_full.html,3155499
7,/statistics/1995/bkup/Mar95_full.html,3155499
8,/statistics/1995/Jun/Jun95_reverse_domains.html,2973350
9,/statistics/1995/Jun/Jun95_reverse_domains.html,2973350


### Creating the date column

In [27]:
dfCleanTypedDate = dfCleanTyped.withColumn('date_time',
                                            fn.from_unixtime(
                                                fn.unix_timestamp('date_time',
                                                                  'dd/MMM/yyyy:HH:mm:ssZ')
                                            ).cast('TIMESTAMP'))
dfCleanTypedDate.limit(5).toPandas()

Unnamed: 0,host,client_identd,user_id,date_time,method,endpoint,protocol,response_code,content_size,content_size_bytes
0,199.72.81.55,-,-,1995-07-01 05:00:01,GET,/history/apollo/,HTTP/1.0,200,6245,6245
1,unicomp6.unicomp.net,-,-,1995-07-01 05:00:06,GET,/shuttle/countdown/,HTTP/1.0,200,3985,3985
2,199.120.110.21,-,-,1995-07-01 05:00:09,GET,/shuttle/missions/sts-73/mission-sts-73.html,HTTP/1.0,200,4085,4085
3,burger.letters.com,-,-,1995-07-01 05:00:11,GET,/shuttle/countdown/liftoff.html,HTTP/1.0,304,0,0
4,199.120.110.21,-,-,1995-07-01 05:00:11,GET,/shuttle/missions/sts-73/sts-73-patch-small.gif,HTTP/1.0,200,4179,4179


In [28]:
dfCleanTypedDate.printSchema()

root
 |-- host: string (nullable = true)
 |-- client_identd: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date_time: timestamp (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- response_code: string (nullable = true)
 |-- content_size: string (nullable = true)
 |-- content_size_bytes: integer (nullable = true)



## Exercise 2: Advanced Analytics NLP
<span style="color:red;font-size:1.5em;">Note: Restart the kernel before running the following blocks</span>

In [1]:
!pip install spark-nlp==1.7.3



In [2]:
import pandas as pd
pd.set_option('max_colwidth', 800)

### Create a spark context that includes a 3rd party jar for NLP

In [3]:
#jarPath = "spark-nlp-assembly-1.7.3.jar"

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .config("spark.jars.packages", "JohnSnowLabs:spark-nlp:1.8.2") \
    .getOrCreate()
spark

### Read multiple files in a dir as one Dataframe

In [4]:
dataPath = "./data/reddit/*.json"
df = spark.read.json(dataPath)
print(df.count())
df.printSchema()

100
root
 |-- data: struct (nullable = true)
 |    |-- approved_at_utc: string (nullable = true)
 |    |-- approved_by: string (nullable = true)
 |    |-- archived: boolean (nullable = true)
 |    |-- author: string (nullable = true)
 |    |-- author_flair_background_color: string (nullable = true)
 |    |-- author_flair_css_class: string (nullable = true)
 |    |-- author_flair_richtext: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- author_flair_template_id: string (nullable = true)
 |    |-- author_flair_text: string (nullable = true)
 |    |-- author_flair_text_color: string (nullable = true)
 |    |-- author_flair_type: string (nullable = true)
 |    |-- author_fullname: string (nullable = true)
 |    |-- author_patreon_flair: boolean (nullable = true)
 |    |-- banned_at_utc: string (nullable = true)
 |    |-- banned_by: string (nullable = true)
 |    |-- can_gild: boolean (nullable = true)
 |    |-- can_mod_post: boolean (nullable = true)


### Deal with Struct type to query subfields 

In [5]:
title = "data.title"
author = "data.author"
dfAuthorTilte = df.select(title, author)
dfAuthorTilte.limit(5).toPandas()

Unnamed: 0,title,author
0,"Microsoft Corp said it has discovered hacking targeting democratic institutions, think tanks, and non-profit organizations in Europe.",jaykirsch
1,Deutsche Bank reportedly planned to extend the dates of $340 million in loans to Trump Organization to avoid a potential nightmare of chasing a sitting president for cash,canuck_burger
2,"Iranian ""morality police"" were forced to fire warning shots when a crowd intervened to prevent them from arresting two women for not wearing a hijab. The incident occurred in Tehran's northeastern Narmak neighbourhood on Friday night, and ended with a mob tearing the door off a police vehicle.",honolulu_oahu_mod
3,"Trump administration 'pushing Saudi nuclear deal' which could benefit company linked to Jared Kushner - Senior Trump administration officials pushed a project to share nuclear power technology with Saudi Arabia over the objections of ethics officials, according to a congressional report",madam1
4,"NASA Happily Reports the Earth is Greener, With More Trees Than 20 Years Ago–and It's Thanks to China, India",purplexxx


### Try to implement the equivalent of flatMap in dataframes

In [6]:
import pyspark.sql.functions as F

dfWordCount = df.select(F.explode(F.split(title,"\\s+")).alias("word")).groupBy("word").count().orderBy(F.desc("count"))
dfWordCount.limit(10).toPandas()

Unnamed: 0,word,count
0,to,58
1,the,46
2,of,42
3,in,41
4,a,25
5,for,20
6,and,19
7,from,12
8,on,11
9,with,10


### Use an NLP libary to do Part-of-Speech Tagging

In [7]:
from com.johnsnowlabs.nlp.pretrained.pipeline.en import BasicPipeline as bp

dfAnnotated = bp.annotate(dfAuthorTilte, "title")
dfAnnotated.printSchema()

root
 |-- text: string (nullable = true)
 |-- author: string (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |-- normal: array (nullable = true)
 |    |-- element: struct (contains

### Deal with Map type to query subfields

In [8]:
dfPos = dfAnnotated.select("text", "pos.metadata", "pos.result")
dfPos.limit(5).toPandas()

Unnamed: 0,text,metadata,result
0,"Microsoft Corp said it has discovered hacking targeting democratic institutions, think tanks, and non-profit organizations in Europe.","[{'word': 'Microsoft'}, {'word': 'Corp'}, {'word': 'said'}, {'word': 'it'}, {'word': 'has'}, {'word': 'discovered'}, {'word': 'hacking'}, {'word': 'targeting'}, {'word': 'democratic'}, {'word': 'institutions'}, {'word': 'think'}, {'word': 'tanks'}, {'word': 'and'}, {'word': 'nonprofit'}, {'word': 'organizations'}, {'word': 'in'}, {'word': 'Europe'}]","[NNP, NNP, VBD, PRP, VBZ, VBN, VBG, VBG, JJ, NNS, VBP, NNS, CC, NN, NNS, IN, NNP]"
1,Deutsche Bank reportedly planned to extend the dates of $340 million in loans to Trump Organization to avoid a potential nightmare of chasing a sitting president for cash,"[{'word': 'Deutsche'}, {'word': 'Bank'}, {'word': 'reportedly'}, {'word': 'planned'}, {'word': 'to'}, {'word': 'extend'}, {'word': 'the'}, {'word': 'dates'}, {'word': 'of'}, {'word': 'million'}, {'word': 'in'}, {'word': 'loans'}, {'word': 'to'}, {'word': 'Trump'}, {'word': 'Organization'}, {'word': 'to'}, {'word': 'avoid'}, {'word': 'a'}, {'word': 'potential'}, {'word': 'nightmare'}, {'word': 'of'}, {'word': 'chasing'}, {'word': 'a'}, {'word': 'sitting'}, {'word': 'president'}, {'word': 'for'}, {'word': 'cash'}]","[NNP, NNP, RB, VBD, TO, VB, DT, NNS, IN, CD, IN, NNS, TO, NNP, NNP, TO, VB, DT, JJ, NN, IN, VBG, DT, VBG, NN, IN, NN]"
2,"Iranian ""morality police"" were forced to fire warning shots when a crowd intervened to prevent them from arresting two women for not wearing a hijab. The incident occurred in Tehran's northeastern Narmak neighbourhood on Friday night, and ended with a mob tearing the door off a police vehicle.","[{'word': 'Iranian'}, {'word': 'morality'}, {'word': 'police'}, {'word': 'were'}, {'word': 'forced'}, {'word': 'to'}, {'word': 'fire'}, {'word': 'warning'}, {'word': 'shots'}, {'word': 'when'}, {'word': 'a'}, {'word': 'crowd'}, {'word': 'intervened'}, {'word': 'to'}, {'word': 'prevent'}, {'word': 'them'}, {'word': 'from'}, {'word': 'arresting'}, {'word': 'two'}, {'word': 'women'}, {'word': 'for'}, {'word': 'not'}, {'word': 'wearing'}, {'word': 'a'}, {'word': 'hijab'}, {'word': 'The'}, {'word': 'incident'}, {'word': 'occurred'}, {'word': 'in'}, {'word': 'Tehran'}, {'word': 's'}, {'word': 'northeastern'}, {'word': 'Narmak'}, {'word': 'neighbourhood'}, {'word': 'on'}, {'word': 'Friday'}, {'word': 'night'}, {'word': 'and'}, {'word': 'ended'}, {'word': 'with'}, {'word': 'a'}, {'word': 'mob'...","[JJ, NN, NN, VBD, VBN, TO, VB, NN, NNS, WRB, DT, NN, VBD, TO, VB, PRP, IN, VBG, CD, NNS, IN, RB, VBG, DT, NN, DT, NN, VBD, IN, NNP, VBZ, JJ, NNP, NN, IN, NNP, NN, CC, VBD, IN, DT, NN, VBG, DT, NN, RP, DT, NN, NN]"
3,"Trump administration 'pushing Saudi nuclear deal' which could benefit company linked to Jared Kushner - Senior Trump administration officials pushed a project to share nuclear power technology with Saudi Arabia over the objections of ethics officials, according to a congressional report","[{'word': 'Trump'}, {'word': 'administration'}, {'word': 'pushing'}, {'word': 'Saudi'}, {'word': 'nuclear'}, {'word': 'deal'}, {'word': 'which'}, {'word': 'could'}, {'word': 'benefit'}, {'word': 'company'}, {'word': 'linked'}, {'word': 'to'}, {'word': 'Jared'}, {'word': 'Kushner'}, {'word': 'Senior'}, {'word': 'Trump'}, {'word': 'administration'}, {'word': 'officials'}, {'word': 'pushed'}, {'word': 'a'}, {'word': 'project'}, {'word': 'to'}, {'word': 'share'}, {'word': 'nuclear'}, {'word': 'power'}, {'word': 'technology'}, {'word': 'with'}, {'word': 'Saudi'}, {'word': 'Arabia'}, {'word': 'over'}, {'word': 'the'}, {'word': 'objections'}, {'word': 'of'}, {'word': 'ethics'}, {'word': 'officials'}, {'word': 'according'}, {'word': 'to'}, {'word': 'a'}, {'word': 'congressional'}, {'word': 're...","[NNP, NN, VBG, NNP, NN, NN, WDT, MD, VB, NN, VBN, TO, NNP, NNP, NNP, NNP, NN, NNS, VBD, DT, NN, TO, VB, JJ, NN, NN, IN, NNP, NNP, IN, DT, NNS, IN, NNS, NNS, VBG, TO, DT, JJ, NN]"
4,"NASA Happily Reports the Earth is Greener, With More Trees Than 20 Years Ago–and It's Thanks to China, India","[{'word': 'NASA'}, {'word': 'Happily'}, {'word': 'Reports'}, {'word': 'the'}, {'word': 'Earth'}, {'word': 'is'}, {'word': 'Greener'}, {'word': 'With'}, {'word': 'More'}, {'word': 'Trees'}, {'word': 'Than'}, {'word': 'Years'}, {'word': 'Agoand'}, {'word': 'It'}, {'word': 's'}, {'word': 'Thanks'}, {'word': 'to'}, {'word': 'China'}, {'word': 'India'}]","[NNP, NNP, NNS, DT, NNP, VBZ, NNP, IN, JJR, NNP, IN, NNS, NNP, PRP, VBZ, NNS, TO, NNP, NNP]"


In [9]:
dfPos= dfAnnotated.select(F.explode("pos").alias("pos"))
dfPos.printSchema()
dfPos.toPandas()

root
 |-- pos: struct (nullable = true)
 |    |-- annotatorType: string (nullable = true)
 |    |-- begin: integer (nullable = false)
 |    |-- end: integer (nullable = false)
 |    |-- result: string (nullable = true)
 |    |-- metadata: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)



Unnamed: 0,pos
0,"(pos, 0, 8, NNP, {'word': 'Microsoft'})"
1,"(pos, 10, 13, NNP, {'word': 'Corp'})"
2,"(pos, 15, 18, VBD, {'word': 'said'})"
3,"(pos, 20, 21, PRP, {'word': 'it'})"
4,"(pos, 23, 25, VBZ, {'word': 'has'})"
...,...
1624,"(pos, 60, 61, IN, {'word': 'of'})"
1625,"(pos, 63, 74, JJ, {'word': 'unapologetic'})"
1626,"(pos, 76, 87, JJ, {'word': 'antisemitic'})"
1627,"(pos, 89, 95, NNS, {'word': 'attacks'})"


### Keep only proper nouns NNP or NNPS

In [10]:
nnpFilter = "pos.result = 'NNP' or pos.result = 'NNPS' "
dfNNP = dfPos.where(nnpFilter)
dfNNP.limit(10).toPandas()

Unnamed: 0,pos
0,"(pos, 0, 8, NNP, {'word': 'Microsoft'})"
1,"(pos, 10, 13, NNP, {'word': 'Corp'})"
2,"(pos, 126, 131, NNP, {'word': 'Europe'})"
3,"(pos, 0, 7, NNP, {'word': 'Deutsche'})"
4,"(pos, 9, 12, NNP, {'word': 'Bank'})"
5,"(pos, 81, 85, NNP, {'word': 'Trump'})"
6,"(pos, 87, 98, NNP, {'word': 'Organization'})"
7,"(pos, 175, 180, NNP, {'word': 'Tehran'})"
8,"(pos, 197, 202, NNP, {'word': 'Narmak'})"
9,"(pos, 221, 226, NNP, {'word': 'Friday'})"


### Extract columns form a map in a col

In [11]:
dfWordTag = dfNNP.selectExpr("pos.metadata['word'] as word", "pos.result as tag")
dfWordTag.limit(10).toPandas()

Unnamed: 0,word,tag
0,Microsoft,NNP
1,Corp,NNP
2,Europe,NNP
3,Deutsche,NNP
4,Bank,NNP
5,Trump,NNP
6,Organization,NNP
7,Tehran,NNP
8,Narmak,NNP
9,Friday,NNP


In [12]:
from pyspark.sql.functions import desc
dfWordTag.groupBy("word").count().orderBy(desc("count")).show()

+--------+-----+
|    word|count|
+--------+-----+
|      US|   14|
|   Trump|    9|
|   Saudi|    8|
|   Putin|    7|
|  Russia|    6|
|  Arabia|    5|
|  Europe|    5|
|Catholic|    4|
|Vladimir|    4|
|      UK|    4|
|   China|    3|
|   South|    3|
|   House|    3|
| Germany|    3|
|    Pope|    3|
|  Church|    3|
|   Egypt|    3|
|  Middle|    2|
|     UBS|    2|
| Mueller|    2|
+--------+-----+
only showing top 20 rows



## Data Lake Implementation Introduction
### The Data Lake
* Originally, data lakes were implemented the hadoop ecosystem, namely HDFS and the various processing tools available like Hive, Pig, Impala and Spark
* Over time the big data tools architecture and hadoop became one possible solution and not the only solution
* We will start by the original idea and then explain the different variations to implement a data lake on AWS

## Data Lake Concepts

<img src="images/data_lake_concepts_1.png">
<img src="images/data_lake_concepts_2.png">
<img src="images/data_lake_concepts_3.png">
<img src="images/data_lake_concepts_4.png">
<img src="images/data_lake_concepts_5.png">

## Data Lake vs Data Warehouse
|              | Data Warehouse| Data Lake|
|------|---------------|----------|
| **Data form**  | Tabular format|All formats|
| **Data value** |High only|High-value, medium-value and to-be-discovered|
| **Ingestion**  |ETL|ELT|
| **Data model** |Star & snowflake with conformed dimesions or data-marts and OLAP cubes|Star, snowflakes and OLAP are also possible but other ad-hoc representations are possible|
|   **Schema**   |Known before ingestion (schema-on-write)|On-the-fly at the time of analysis (schema-on-read)|
| **Technology** |Expensive MPP databases with expensive disks and connectivity|Commodity hardware with parallelism as first principle|
|**Data Quality**|High with effort for consistency and clear rules for accessibility|Mixed, some data remain in raw format, some data is transformed to higher quality|
|   **Users**    |Business analysts|Data scientist, Business analysts & ML engineers|
| **Analytics**  |Reports and Business Intelligence visualizations|Machine Learning, graph analytics and data exploration|

## Data Lake options on AWS
|Storage|Processing|AWS-Managed Solution |Vendor-Managed|
|-------|----------|-------------------- |--------------|
|HDFS   |Spark     |AWS EMR (HDFS + Spark|EC2 + Vendor Solution|
|S3     |Spark      |AWS EMR (Spark Only)|EC2 + Vendor Solution|
|S3     |Server-less|AWS Athena          |Server-less + Vendor Solution|

## AWS Options: EMR (HDFS + Spark)
<img src="images/aws_emr_solution_1.png">

## AWS Options: EMR: S3 + Spark
<img src="images/data_lake_option2.png">

## AWS Options: Athena

<img src="images/data_lake_options3.png">

## Exercise 3 - Data Lake on S3
To complete this exercise you have to download the `credentials.csv` from AWS user.

<span style="color:red;font-size:1.5em;">Note: Restart the kernel before running the following blocks</span>

### Importing the libraries

In [1]:
from pyspark.sql import SparkSession
import os
import configparser
import pandas as pd

### Make sure that your AWS credentials are loaded as env vars

In [2]:
credentials = pd.read_csv('credentials/credentials.csv')
os.environ["AWS_ACCESS_KEY_ID"] = credentials['Access key ID'][0]
os.environ["AWS_SECRET_ACCESS_KEY"] = credentials['Secret access key'][0]

### Create spark session with hadoop-aws package

In [3]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

### Load data from S3

In [4]:
df = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv")

In [5]:
df.printSchema()
df.show(5)

root
 |-- _c0: string (nullable = true)

+--------------------+
|                 _c0|
+--------------------+
|payment_id;custom...|
|16050;269;2;7;1.9...|
|16051;269;1;98;0....|
|16052;269;2;678;6...|
|16053;269;2;703;0...|
+--------------------+
only showing top 5 rows



### Infer schema, fix header and separator

In [6]:
df = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv",sep=';', inferSchema=True, header=True)

In [7]:
df.printSchema()
df.show(5)

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: string (nullable = true)

+----------+-----------+--------+---------+------+--------------------+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|
+----------+-----------+--------+---------+------+--------------------+
|     16050|        269|       2|        7|  1.99|2017-01-24 21:40:...|
|     16051|        269|       1|       98|  0.99|2017-01-25 15:16:...|
|     16052|        269|       2|      678|  6.99|2017-01-28 21:44:...|
|     16053|        269|       2|      703|  0.99|2017-01-29 00:58:...|
|     16054|        269|       1|      750|  4.99|2017-01-29 08:10:...|
+----------+-----------+--------+---------+------+--------------------+
only showing top 5 rows



### Fix the data yourself 

In [8]:
import pyspark.sql.functions as F
dfPayment = df.withColumn("payment_date", F.to_timestamp("payment_date"))
dfPayment.printSchema()
dfPayment.show(5)

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: timestamp (nullable = true)

+----------+-----------+--------+---------+------+--------------------+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|
+----------+-----------+--------+---------+------+--------------------+
|     16050|        269|       2|        7|  1.99|2017-01-24 21:40:...|
|     16051|        269|       1|       98|  0.99|2017-01-25 15:16:...|
|     16052|        269|       2|      678|  6.99|2017-01-28 21:44:...|
|     16053|        269|       2|      703|  0.99|2017-01-29 00:58:...|
|     16054|        269|       1|      750|  4.99|2017-01-29 08:10:...|
+----------+-----------+--------+---------+------+--------------------+
only showing top 5 rows



### Extract the month

In [9]:
dfPayment = dfPayment.withColumn('month',F.month('payment_date'))
dfPayment.show(5)

+----------+-----------+--------+---------+------+--------------------+-----+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|month|
+----------+-----------+--------+---------+------+--------------------+-----+
|     16050|        269|       2|        7|  1.99|2017-01-24 21:40:...|    1|
|     16051|        269|       1|       98|  0.99|2017-01-25 15:16:...|    1|
|     16052|        269|       2|      678|  6.99|2017-01-28 21:44:...|    1|
|     16053|        269|       2|      703|  0.99|2017-01-29 00:58:...|    1|
|     16054|        269|       1|      750|  4.99|2017-01-29 08:10:...|    1|
+----------+-----------+--------+---------+------+--------------------+-----+
only showing top 5 rows



### Compute aggregate revenue per month

In [10]:
dfPayment.createOrReplaceTempView('payment')
spark.sql("""
    SELECT month, sum(amount) AS revenue
    FROM payment
    GROUP BY month
    ORDER BY revenue DESC
""").show()

+-----+------------------+
|month|           revenue|
+-----+------------------+
|    4|28327.020000003868|
|    3|23886.560000002115|
|    2| 9631.879999999608|
|    1| 4824.429999999856|
|    5| 746.6200000000017|
+-----+------------------+



### Fix the schema

In [11]:
import pyspark.sql.types as typ

paymentSchema = typ.StructType([
    typ.StructField('payment_id',typ.IntegerType()),
    typ.StructField('customer_id',typ.IntegerType()),
    typ.StructField('staff_id',typ.IntegerType()),
    typ.StructField('rental_id',typ.IntegerType()),
    typ.StructField('amount',typ.DoubleType()),
    typ.StructField('payment_date',typ.DateType())
])

In [12]:
dfPaymentWithSchema = spark.read.csv("s3a://udacity-dend/pagila/payment/payment.csv",sep=";", schema=paymentSchema, header=True)

In [13]:
dfPaymentWithSchema.printSchema()
df.show(5)

root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_date: date (nullable = true)

+----------+-----------+--------+---------+------+--------------------+
|payment_id|customer_id|staff_id|rental_id|amount|        payment_date|
+----------+-----------+--------+---------+------+--------------------+
|     16050|        269|       2|        7|  1.99|2017-01-24 21:40:...|
|     16051|        269|       1|       98|  0.99|2017-01-25 15:16:...|
|     16052|        269|       2|      678|  6.99|2017-01-28 21:44:...|
|     16053|        269|       2|      703|  0.99|2017-01-29 00:58:...|
|     16054|        269|       1|      750|  4.99|2017-01-29 08:10:...|
+----------+-----------+--------+---------+------+--------------------+
only showing top 5 rows



In [14]:
dfPaymentWithSchema.createOrReplaceTempView("payment")
spark.sql("""
    SELECT month(payment_date) as m, sum(amount) as revenue
    FROM payment
    GROUP by m
    order by revenue desc
""").show()

+---+------------------+
|  m|           revenue|
+---+------------------+
|  4|28559.460000003943|
|  3|23886.560000002115|
|  2| 9631.879999999608|
|  1| 4824.429999999856|
|  5|  514.180000000001|
+---+------------------+



## Demo: Data Lake on EMR

Click on the following Youtube Udacity tutorials to watch the demo on data lake on EMR: 

[part 1](https://www.youtube.com/watch?v=539RnL05fGQ)

[part 2](https://www.youtube.com/watch?v=lr-6oUkzs2w)


## Demo: Data Lake on Athena

Click on the following Youtube Udacity tutorials to watch the demo on data lake on EMR: 

[part1](https://www.youtube.com/watch?v=Cy-kJEc5oKE)

[part2](https://www.youtube.com/watch?v=LWOTnS2wS9U)

## Data Lake Issues
* Data Lake is prone to being a chaotic **data garbage dump**. Efforts are being made to put measures and practices like detailed meta-data to reduce this risk.
* Since a major feature of the data lake is the wide accessibility of cross-department data and external data of relevance, sometimes **data governance** is not easy to implement. Telling who has access to to what is hard
* Finally, it is still sometimes unclear, per given case, whether a data lake should **replace, offload or work in parallel** with a data warehouse or data marts. In all cases, dimensional modeling, even in the context of a data lake, continues to remain a valuable practice. 

## Launch EMR Cluster and Notebook
Follow the instructions below to launch your EMR cluster and notebook.

* Go to the [Amazon EMR Console](https://console.aws.amazon.com/elasticmapreduce/)
* Select "Clusters" in the menu on the left, and click the "Create cluster" button.

<img src="images/create-cluster-button.png">

### Step 1: Configure your cluster with the following settings:
* Release: `emr-5.20.0` or later
* Applications: `Spark`: Spark 2.4.0 on Hadoop 2.8.5 YARN with Ganglia 3.7.2 and Zeppelin 0.8.0
* Instance type: `m3.xlarge`
* Number of instance: `3`
* EC2 key pair: `Proceed without an EC2 key pair` or feel free to use one if you'd like

You can keep the remaining default setting and click "Create cluster" on the bottom right.

<img src="images/configure-cluster.png">

### Step 2: Wait for Cluster "Waiting" Status
Once you create the cluster, you'll see a status next to your cluster name that says Starting. Wait a short time for this status to change to Waiting before moving on to the next step.

<img src="images/cluster-waiting.png">

### Step 3: Create Notebook
Now that you launched your cluster successfully, let's create a notebook to run Spark on that cluster.

Select "Notebooks" in the menu on the left, and click the "Create notebook" button.

<img src="images/create-notebook-button.png">

### Step 4: Configure your notebook
* Enter a name for your notebook
* Select "Choose an existing cluster" and choose the cluster you just created
* Use the default setting for "AWS service role" - this should be "EMR_Notebooks_DefaultRole" or "Create default role" if you haven't done this before.

You can keep the remaining default settings and click "Create notebook" on the bottom right.

<img src="images/configure-notebook.png">

### Step 5: Wait for Notebook "Ready" Status, Then Open
Once you create an EMR notebook, you'll need to wait a short time before the notebook status changes from Starting or Pending to Ready. Once your notebook status is Ready, click the "Open" button to open the notebook.

<img src="images/notebook-ready.png">

### Start Coding!
Now you can run Spark code for your project in this notebook, which EMR will run on your cluster. In the next page, you'll find starter code to create a spark session and read in the full 12GB dataset for the DSND Capstone project.

<img src="images/empty-notebook.png">

### Download Notebook
When you are finished with your notebook, click `File` > `Download as` > `Notebook` to download it to your computer.

For more information on EMR notebooks, click [here](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-managed-notebooks.html).

## Avoid Paying Unexpected Costs

### Pricing - Be Careful!
From this point on, AWS will charge you for running your EMR cluster. You can find the details on the [EMR Pricing here](https://aws.amazon.com/emr/pricing/). If you run your cluster for a week with the settings specified earlier (3 instances of `m3.xlarge`), you should expect costs to be around $30.

Most importantly, remember to terminate your cluster when you are done. Otherwise, your cluster might run for a day, week, month, or longer without you remembering, and you’ll wind up with a large bill!

### Terminate Your Cluster

You can terminate your cluster while still keeping the Jupyter notebook you created. In EMR, your EMR cluster and EMR notebook are decoupled (so you can reattach your notebook to a different cluster at any time)! To terminate your cluster, click "Clusters" in the menu on the left, check the box next to your cluster to select, and click the "Terminate" button.

<img src="images/terminate-cluster.png">

### Change Cluster for Notebook
If you still have a notebook on EMR and terminated the cluster it was connected to, you can still run that notebook at any time by creating another cluster (following the instructions in the previous section). Once you have the new cluster launched and in "waiting" status, click "Notebooks" in the menu on the left and click on the name of your notebook. Then click the "Change cluster" button.

<img src="images/change-cluster-button.png">

Select your new cluster. Once your notebook reaches "Ready" status, you can now run this notebook on your new cluster.


### Delete Your Notebook
When you've finished with your project and downloaded your notebook, you can delete your notebook from EMR by selecting "Notebooks" in the menu on the left, selecting your notebook, and then clicking "Delete." Make sure to terminate the clusters you launched for this as well.
<img src="images/delete-notebook.png">

### Delete S3 buckets
AWS charges primarily for running instances, so most of the charges will cease once you stop the cluster. However, there are smaller storage charges that continue to accrue if you don't delete your buckets. To delete your buckets, go to the [Amazon S3 console](https://console.aws.amazon.com/s3/). Choose the bucket you want to delete from the list, so that the whole bucket row is selected. Choose delete bucket, type the name of the bucket, and click "Confirm."

For more information about deleting folders and buckets, go to [How Do I Delete an S3 Bucket](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/delete-bucket.html) in the Amazon Simple Storage Service Getting Started Guide.

You can view your billing information at any time by clicking on your account name on the upper right corner of the console and go to **My Billing Dashboard**.

<img src="images/aws-billing.png">