# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
# install packages (this actually done from command line)

#!pip install openmeteo-requests
#!pip install requests-cache retry-requests
#!pip install openmeteo-requests

#!pip install pandasql

In [1]:
# Do all imports and installs here
import pandas as pd
from pandasql import sqldf

import requests

import openmeteo_requests
import requests_cache
from retry_requests import retry

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

### A ENVIRONMENT AGENCY DATA

#### A.01 RIVER LEVEL STATIONS

In [13]:

# all river level stations within 20 km of Chesterfield (specify lat long and distance parms in API end point)

# CHESTERFIELD LOCATION CO-ORDINATES
# "latitude": 53.25
# "longitude": -1.4167

url = "http://environment.data.gov.uk/flood-monitoring/id/stations?parameter=level&lat=53.25&long=-1.4167&dist=10"

# send GET request to the API end point URL
response = requests.request("GET",url)

# create dictionary from the response object that is in JSON format
response_dict = response.json()

# inspection of the dictionary showed that the stataion specific data (values) that we need are inside the "item" key, so limit to that
df_river_level_stations_radius = pd.DataFrame.from_dict(response_dict["items"])

In [14]:
df_river_level_stations_radius.head(5)

Unnamed: 0,@id,RLOIid,catchmentName,dateOpened,easting,label,lat,long,measures,northing,notation,riverName,stageScale,stationReference,status,town,wiskiID,gridReference
0,http://environment.data.gov.uk/flood-monitorin...,8114,Don and Rother,2000-04-17,444800,Killamarsh,53.326718,-1.328824,[{'@id': 'http://environment.data.gov.uk/flood...,381300,L0307,River Rother,http://environment.data.gov.uk/flood-monitorin...,L0307,http://environment.data.gov.uk/flood-monitorin...,Killamarsh,L0307,
1,http://environment.data.gov.uk/flood-monitorin...,8217,Don and Rother,1964-01-01,439400,Whittington,53.266027,-1.410737,[{'@id': 'http://environment.data.gov.uk/flood...,374500,F0203,River Rother,http://environment.data.gov.uk/flood-monitorin...,F0203,http://environment.data.gov.uk/flood-monitorin...,Old Whittington,F0203,
2,http://environment.data.gov.uk/flood-monitorin...,8299,Don and Rother,2003-09-23,432321,Dore Twenty Well Lane,53.326618,-1.516187,[{'@id': 'http://environment.data.gov.uk/flood...,381188,L0407,River Sheaf,http://environment.data.gov.uk/flood-monitorin...,L0407,http://environment.data.gov.uk/flood-monitorin...,Dore,L0407,SK323811
3,http://environment.data.gov.uk/flood-monitorin...,8296,Don and Rother,2003-02-16,438744,Chesterfield Tapton Bridge,53.239901,-1.420925,[{'@id': 'http://environment.data.gov.uk/flood...,371588,L0204,River Rother,http://environment.data.gov.uk/flood-monitorin...,L0204,http://environment.data.gov.uk/flood-monitorin...,Chesterfield,L0204,SK387715
4,http://environment.data.gov.uk/flood-monitorin...,8295,Don and Rother,2003-04-14,438078,Chesterfield Park Road Bridge,53.232965,-1.430996,[{'@id': 'http://environment.data.gov.uk/flood...,370811,L0233,River Hipper,http://environment.data.gov.uk/flood-monitorin...,L0233,http://environment.data.gov.uk/flood-monitorin...,Chesterfield,L0233,SK380708


In [15]:
df_river_level_stations_radius.dtypes

@id                  object
RLOIid               object
catchmentName        object
dateOpened           object
easting               int64
label                object
lat                 float64
long                float64
measures             object
northing              int64
notation             object
riverName            object
stageScale           object
stationReference     object
status               object
town                 object
wiskiID              object
gridReference        object
dtype: object

In [16]:
# maintain only the columns we need

df_river_level_stations_radius = df_river_level_stations_radius[['label','lat','long','riverName','stationReference','town']].copy()

In [17]:
df_river_level_stations_radius.head(50)

Unnamed: 0,label,lat,long,riverName,stationReference,town
0,Killamarsh,53.326718,-1.328824,River Rother,L0307,Killamarsh
1,Whittington,53.266027,-1.410737,River Rother,F0203,Old Whittington
2,Dore Twenty Well Lane,53.326618,-1.516187,River Sheaf,L0407,Dore
3,Chesterfield Tapton Bridge,53.239901,-1.420925,River Rother,L0204,Chesterfield
4,Chesterfield Park Road Bridge,53.232965,-1.430996,River Hipper,L0233,Chesterfield
5,Dronfield,53.302721,-1.472506,River Drone,L0215,Dronfield
6,Hady Hill,53.233706,-1.418671,River Rother,L0206,Chesterfield
7,Chesterfield,53.231179,-1.454659,River Hipper,L0235,Walton
8,Staveley,53.26654,-1.337263,River Doe Lea,F0103,Woodthorpe
9,Sheepbridge,53.268853,-1.437687,River Whitting,F0220,Sheepbridge


In [49]:
# From Environment Agency Map we want river level stations that are within the CHESTERFIELD urban area
# so we can use LATITUDE AND LONGITUDE TO BOUND WHAT WE NEED

# MOST EASTERLY Chesterfield Calow Lane	53.225038	-1.398178	Calow Brook	
# MOST WESTERLY

AND SOUTHERLY STATION FROM MAP IS NEWBIGGIN BRIDGE   (54.853151   -2.881322)
 STATION FROM MAP IS CARLISLE RAFFLES AVENUE    (54.892374   -2.963486)
# MOST NORTHERLY STATION FROM MAP IS LINSTOCK   (54.912827  -2.893015)

lat_max=54.912827
lat_min=54.853151
long_max=-2.881322
long_min=-2.963486

sql_query=f"""select * from df_river_level_stations_radius
              where lat between {lat_min} and {lat_max}
              and long between {long_min} and {long_max}
           """

df_river_level_stations_box = sqldf(sql_query)

###### MAP OF CHESTERFIELD RIVER LEVEL MONITORING POINTS. RED BOX ENCAPSULATES THE ONES WE WANT TO USE
![title](img/ChesterfieldRiverLevelStations.png)

In [50]:
df_river_level_stations_box.head(20)

Unnamed: 0,label,lat,long,riverName,stationReference,town
0,Denton Holme,54.885739,-2.937513,River Caldew,765045,Carlisle
1,"Botcherby Bridge, Carlisle",54.89341,-2.910796,River Petteril,764070,"Botcherby, Carlisle"
2,"Sands Centre, Carlisle",54.898639,-2.932277,River Eden,762600,"Sands Centre, Carlisle"
3,Linstock,54.912827,-2.893015,River Eden,762540,Linstock
4,Cummersdale,54.865626,-2.944447,River Caldew,765013,Cummersdale
5,Harraby Green Business Park,54.879957,-2.917974,River Petteril,764010,Harraby Green Carlisle
6,Melbourne Park Carlisle,54.889748,-2.913769,River Petteril,764020,Melbourne Park Carlisle
7,Skew Bridge,54.891143,-2.9382,River Caldew,765090,Skew Bridge Carlisle
8,Sheepmount,54.90486,-2.952852,River Eden,765512,Carlisle
9,Newbiggin Bridge,54.853151,-2.881322,River Petteril,764050,Carleton


#### A.02 RAINFALL STATIONS

###### MAP OF ENVIRONMENT AGENCY RAINFALL STATIONS ON EDEN CATCHMENT. RED RECTANGLE ENCAPSULATES THE ONES WE WANT TO USE

![title](img/RainfallStationsOnEdenCatchment.png)

In [None]:
# MOST EASTERLY FROM MAP IS ALLENHEADS ALLEN LODGE   ()
# MOST WESTERLY STATION FROM MAP IS KESWICK    ()
# MOST NORTHERLY STATION FROM MAP IS WILLOWHOLME   ()
# MOST SOUTHERLY STATION FROM MAP IS WEST SLEDDALE   ()

lat_max=
lat_min=
long_max=
long_min=

In [None]:
url = "http://environment.data.gov.uk/flood-monitoring/id/stations?parameter=rainfall&lat=54.89&long=-2.94&dist=20"

response = requests.request("GET",url)
print(response.text)




In [3]:
# WEATHER DATA FROM THE OPENMETEO SOURCE

# Setup the Open-Meteo API client with cache and retry on error
cache_session = requests_cache.CachedSession('.cache', expire_after = -1)
retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2)
openmeteo = openmeteo_requests.Client(session = retry_session)

# Make sure all required weather variables are listed here
# The order of variables in hourly or daily is important to assign them correctly below
url = "https://archive-api.open-meteo.com/v1/archive"

# CARLISLE LOCATION
# CLOSE TO THE 2015 FLOODS
params = {
    "latitude": 54.8951,
    "longitude": -2.9382,
    "start_date": "2015-12-01",
    "end_date": "2015-12-06",
    "hourly": ["temperature_2m", "precipitation", "rain"],
    "daily": ["precipitation_sum", "rain_sum"],
    "timezone": "GMT"
}



responses = openmeteo.weather_api(url, params=params)

# Process first location. Add a for-loop for multiple locations or weather models
response = responses[0]
print(f"Coordinates {response.Latitude()}°E {response.Longitude()}°N")
print(f"Elevation {response.Elevation()} m asl")
print(f"Timezone {response.Timezone()} {response.TimezoneAbbreviation()}")
print(f"Timezone difference to GMT+0 {response.UtcOffsetSeconds()} s")

# Process hourly data. The order of variables needs to be the same as requested.
hourly = response.Hourly()
hourly_temperature_2m = hourly.Variables(0).ValuesAsNumpy()
hourly_precipitation = hourly.Variables(1).ValuesAsNumpy()
hourly_rain = hourly.Variables(2).ValuesAsNumpy()

hourly_data = {"date": pd.date_range(
    start = pd.to_datetime(hourly.Time(), unit = "s"),
    end = pd.to_datetime(hourly.TimeEnd(), unit = "s"),
    freq = pd.Timedelta(seconds = hourly.Interval()),
    inclusive = "left"
)}
hourly_data["temperature_2m"] = hourly_temperature_2m
hourly_data["precipitation"] = hourly_precipitation
hourly_data["rain"] = hourly_rain

hourly_dataframe = pd.DataFrame(data = hourly_data)
print(hourly_dataframe)

# Process daily data. The order of variables needs to be the same as requested.
daily = response.Daily()
daily_precipitation_sum = daily.Variables(0).ValuesAsNumpy()
daily_rain_sum = daily.Variables(1).ValuesAsNumpy()

daily_data = {"date": pd.date_range(
    start = pd.to_datetime(daily.Time(), unit = "s"),
    end = pd.to_datetime(daily.TimeEnd(), unit = "s"),
    freq = pd.Timedelta(seconds = daily.Interval()),
    inclusive = "left"
)}
daily_data["precipitation_sum"] = daily_precipitation_sum
daily_data["rain_sum"] = daily_rain_sum

daily_dataframe = pd.DataFrame(data = daily_data)
print(daily_dataframe)


Coordinates 54.868186950683594°E -2.857147216796875°N
Elevation 24.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
                   date  temperature_2m  precipitation  rain
0   2015-12-01 00:00:00          -0.068            0.0   0.0
1   2015-12-01 01:00:00          -0.118            0.0   0.0
2   2015-12-01 02:00:00          -0.018            0.0   0.0
3   2015-12-01 03:00:00           0.682            0.0   0.0
4   2015-12-01 04:00:00           1.632            0.0   0.0
..                  ...             ...            ...   ...
139 2015-12-06 19:00:00           4.282            0.0   0.0
140 2015-12-06 20:00:00           3.682            0.0   0.0
141 2015-12-06 21:00:00           3.232            0.0   0.0
142 2015-12-06 22:00:00           3.282            0.0   0.0
143 2015-12-06 23:00:00           3.632            0.0   0.0

[144 rows x 4 columns]
        date  precipitation_sum   rain_sum
0 2015-12-01           9.800000   9.700001
1 2015-12-02           3.700000

In [6]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [11]:
#write to parquet
df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [None]:
# Performing cleaning tasks here





### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.