## Data Ingestion Examples from Public Datasets

![City of LA Data](https://databricks.com/wp-content/uploads/2017/09/Screen-Shot-2017-09-11-at-8.42.06-AM.png)

Some primary tasks a data engineer is often tasked with are:
1. data ingestion
2. data transformation (some ETL to cleanse data)
3. data storage and data access (for downstream consumption)

In this example notebook, we will quickly do these three steps.

The dataset we going to use is crime data from [2010 to present from the city of Los Angeles](https://data.lacity.org/A-Safe-City/Crime-Data-from-2010-to-Present/y8tr-7khq). 

Other public datasets of interest can be perused from [data.gov](https://www.data.gov/open-gov/)

In [None]:
dbutils.fs.mkdirs("/mnt/crime_la_data")

# Data Ingestion

In [None]:
%sh wget  wget https://data.lacity.org/api/views/y8tr-7khq/rows.csv?accessType=DOWNLOAD -O la_crimes.csv -O /tmp/la_crimes.csv

In [None]:
dbutils.fs.mv("file:///tmp/la_crimes.csv", "/mnt/crime_la_data/la_crimes.csv")

In [None]:
la_crimes_df = spark.read.format('csv').options(header='true', inferSchema='true').load('/mnt/crime_la_data/la_crimes.csv')
la_crimes_df.count()

In [None]:
display(la_crimes_df)

DR Number,Date Reported,Date Occurred,Time Occurred,Area ID,Area Name,Reporting District,Crime Code,Crime Code Description,MO Codes,Victim Age,Victim Sex,Victim Descent,Premise Code,Premise Description,Weapon Used Code,Weapon Description,Status Code,Status Description,Crime Code 1,Crime Code 2,Crime Code 3,Crime Code 4,Address,Cross Street,Location
1208575,03/14/2013,03/11/2013,1800,12,77th Street,1241,626,INTIMATE PARTNER - SIMPLE ASSAULT,0416 0446 1243 2000,30.0,F,W,502,"MULTI-UNIT DWELLING (APARTMENT, DUPLEX, ETC)",400.0,"STRONG-ARM (HANDS, FIST, FEET OR BODILY FORCE)",AO,Adult Other,626,,,,6300 BRYNHURST AV,,"(33.9829, -118.3338)"
102005556,01/25/2010,01/22/2010,2300,20,Olympic,2071,510,VEHICLE - STOLEN,,,,,101,STREET,,,IC,Invest Cont,510,,,,VAN NESS,15TH,"(34.0454, -118.3157)"
418,03/19/2013,03/18/2013,2030,18,Southeast,1823,510,VEHICLE - STOLEN,,12.0,,,101,STREET,,,IC,Invest Cont,510,,,,200 E 104TH ST,,"(33.942, -118.2717)"
101822289,11/11/2010,11/10/2010,1800,18,Southeast,1803,510,VEHICLE - STOLEN,,,,,101,STREET,,,IC,Invest Cont,510,,,,88TH,WALL,"(33.9572, -118.2717)"
42104479,01/11/2014,01/04/2014,2300,21,Topanga,2133,745,VANDALISM - MISDEAMEANOR ($399 OR UNDER),0329,84.0,M,W,501,SINGLE FAMILY DWELLING,,,IC,Invest Cont,745,,,,7200 CIRRUS WY,,"(34.2009, -118.6369)"
120125367,01/08/2013,01/08/2013,1400,1,Central,111,110,CRIMINAL HOMICIDE,1243 2000 1813 1814 2002 0416 0400,49.0,F,W,501,SINGLE FAMILY DWELLING,400.0,"STRONG-ARM (HANDS, FIST, FEET OR BODILY FORCE)",AA,Adult Arrest,110,,,,600 N HILL ST,,"(34.0591, -118.2412)"
101105609,01/28/2010,01/27/2010,2230,11,Northeast,1125,510,VEHICLE - STOLEN,,,,,108,PARKING LOT,,,IC,Invest Cont,510,,,,YORK,AVENUE 51,"(34.1211, -118.2048)"
101620051,11/11/2010,11/07/2010,1600,16,Foothill,1641,510,VEHICLE - STOLEN,,,,,101,STREET,,,IC,Invest Cont,510,,,,EL DORADO,TRUESDALE,"(34.241, -118.3987)"
101910498,04/07/2010,04/07/2010,1600,19,Mission,1902,510,VEHICLE - STOLEN,,,,,101,STREET,,,IC,Invest Cont,510,,,,GLENOAKS,DRELL,"(34.3147, -118.4589)"
120908292,03/29/2013,01/15/2013,800,9,Van Nuys,904,668,"EMBEZZLEMENT, GRAND THEFT ($950.01 & OVER)",0344 1300,27.0,F,O,203,OTHER BUSINESS,,,IC,Invest Cont,668,,,,7200 SEPULVEDA BL,,"(34.2012, -118.4662)"


In [None]:
la_crimes_df.printSchema()

In [None]:
la_crimes_df.columns

# Data Transformation: ETL for Cleansing Data
Let's do some ETL to convert dates to proper timestamps and change columns names to remove spaces. Parquet does not like spaces (" ") within column names,
so for us to store our dataframes as parquet files, we will need to massage these columns. Also, we need to convert string time into unix timestamps.

In [None]:
from pyspark.sql.functions import *

In [None]:
new_la_crimes_df = la_crimes_df.withColumnRenamed('DR Number', 'DRNumber')\
  .withColumnRenamed('Date Reported', 'DateReported') \
  .withColumnRenamed('Date Occurred', 'DateOccurred') \
  .withColumnRenamed('Time Occurred', 'TimeOccurred') \
  .withColumnRenamed('Area ID', 'AreaID') \
  .withColumnRenamed('Area Name','AreaName') \
  .withColumnRenamed('Reporting District', 'ReportingDistrict') \
  .withColumnRenamed('Crime Code', 'CrimeCode') \
  .withColumnRenamed('Crime Code Description', 'CrimeCodeDescription') \
  .withColumnRenamed('MO Codes', 'MOCodes') \
  .withColumnRenamed('Victim Age', 'VictimAge') \
  .withColumnRenamed('Victim Sex', 'VictimSex') \
  .withColumnRenamed('Victim Descent', 'VictimDescent') \
  .withColumnRenamed('Victim Descent', 'Victim Descent') \
  .withColumnRenamed('Premise Code', 'PremiseCode') \
  .withColumnRenamed('Premise Description', 'PremiseDescription') \
  .withColumnRenamed('Weapon Used Code','WeaponUsedCode') \
  .withColumnRenamed('Weapon Description','WeaponDescription') \
  .withColumnRenamed('Status Code', 'StatusCode') \
  .withColumnRenamed('Status Description', 'StatusDescription') \
  .withColumnRenamed('Crime Code 1', 'CrimeCode1') \
  .withColumnRenamed('Crime Code 2', 'CrimeCode2') \
  .withColumnRenamed('Crime Code 3', 'CrimeCode3') \
  .withColumnRenamed('Crime Code 4', 'CrimeCode4') \
  .withColumnRenamed('Cross Street', 'CrossStreet') \
  .withColumnRenamed('Location ', 'Location')

In [None]:
new_la_crimes_df.columns

In [None]:
display(new_la_crimes_df)

DRNumber,DateReported,DateOccurred,TimeOccurred,AreaID,AreaName,ReportingDistrict,CrimeCode,CrimeCodeDescription,MOCodes,VictimAge,VictimSex,VictimDescent,PremiseCode,PremiseDescription,WeaponUsedCode,WeaponDescription,StatusCode,StatusDescription,CrimeCode1,CrimeCode2,CrimeCode3,CrimeCode4,Address,CrossStreet,Location
1208575,03/14/2013,03/11/2013,1800,12,77th Street,1241,626,INTIMATE PARTNER - SIMPLE ASSAULT,0416 0446 1243 2000,30.0,F,W,502,"MULTI-UNIT DWELLING (APARTMENT, DUPLEX, ETC)",400.0,"STRONG-ARM (HANDS, FIST, FEET OR BODILY FORCE)",AO,Adult Other,626,,,,6300 BRYNHURST AV,,"(33.9829, -118.3338)"
102005556,01/25/2010,01/22/2010,2300,20,Olympic,2071,510,VEHICLE - STOLEN,,,,,101,STREET,,,IC,Invest Cont,510,,,,VAN NESS,15TH,"(34.0454, -118.3157)"
418,03/19/2013,03/18/2013,2030,18,Southeast,1823,510,VEHICLE - STOLEN,,12.0,,,101,STREET,,,IC,Invest Cont,510,,,,200 E 104TH ST,,"(33.942, -118.2717)"
101822289,11/11/2010,11/10/2010,1800,18,Southeast,1803,510,VEHICLE - STOLEN,,,,,101,STREET,,,IC,Invest Cont,510,,,,88TH,WALL,"(33.9572, -118.2717)"
42104479,01/11/2014,01/04/2014,2300,21,Topanga,2133,745,VANDALISM - MISDEAMEANOR ($399 OR UNDER),0329,84.0,M,W,501,SINGLE FAMILY DWELLING,,,IC,Invest Cont,745,,,,7200 CIRRUS WY,,"(34.2009, -118.6369)"
120125367,01/08/2013,01/08/2013,1400,1,Central,111,110,CRIMINAL HOMICIDE,1243 2000 1813 1814 2002 0416 0400,49.0,F,W,501,SINGLE FAMILY DWELLING,400.0,"STRONG-ARM (HANDS, FIST, FEET OR BODILY FORCE)",AA,Adult Arrest,110,,,,600 N HILL ST,,"(34.0591, -118.2412)"
101105609,01/28/2010,01/27/2010,2230,11,Northeast,1125,510,VEHICLE - STOLEN,,,,,108,PARKING LOT,,,IC,Invest Cont,510,,,,YORK,AVENUE 51,"(34.1211, -118.2048)"
101620051,11/11/2010,11/07/2010,1600,16,Foothill,1641,510,VEHICLE - STOLEN,,,,,101,STREET,,,IC,Invest Cont,510,,,,EL DORADO,TRUESDALE,"(34.241, -118.3987)"
101910498,04/07/2010,04/07/2010,1600,19,Mission,1902,510,VEHICLE - STOLEN,,,,,101,STREET,,,IC,Invest Cont,510,,,,GLENOAKS,DRELL,"(34.3147, -118.4589)"
120908292,03/29/2013,01/15/2013,800,9,Van Nuys,904,668,"EMBEZZLEMENT, GRAND THEFT ($950.01 & OVER)",0344 1300,27.0,F,O,203,OTHER BUSINESS,,,IC,Invest Cont,668,,,,7200 SEPULVEDA BL,,"(34.2012, -118.4662)"


In [None]:
from_pattern = 'MM/dd/yyyy'
new_la_crimes_ts_df = new_la_crimes_df\
  .withColumn('DateReportedTS', unix_timestamp(new_la_crimes_df['DateReported'], from_pattern).cast("timestamp")) \
  .drop('DateReported') \
  .withColumn('DateOccuredTS', unix_timestamp(new_la_crimes_df['DateOccurred'], from_pattern).cast("timestamp")) \
  .drop('DateOccurred')

In [None]:
new_la_crimes_ts_df.printSchema()

In [None]:
display(new_la_crimes_ts_df.select('DateReportedTS', 'DateOccuredTS'))

DateReportedTS,DateOccuredTS
2013-03-14T00:00:00.000+0000,2013-03-11T00:00:00.000+0000
2010-01-25T00:00:00.000+0000,2010-01-22T00:00:00.000+0000
2013-03-19T00:00:00.000+0000,2013-03-18T00:00:00.000+0000
2010-11-11T00:00:00.000+0000,2010-11-10T00:00:00.000+0000
2014-01-11T00:00:00.000+0000,2014-01-04T00:00:00.000+0000
2013-01-08T00:00:00.000+0000,2013-01-08T00:00:00.000+0000
2010-01-28T00:00:00.000+0000,2010-01-27T00:00:00.000+0000
2010-11-11T00:00:00.000+0000,2010-11-07T00:00:00.000+0000
2010-04-07T00:00:00.000+0000,2010-04-07T00:00:00.000+0000
2013-03-29T00:00:00.000+0000,2013-01-15T00:00:00.000+0000


# Data Storage

Now that we are satisfied with our format and data transformation, we can save this a parquet files and create a table on top of it.

In [None]:
dbutils.fs.mkdirs("/mnt/parquet/crime_la_data")

In [None]:
new_la_crimes_ts_df.write.mode("overwrite").format("parquet").save("/mnt/parquet/crime_la_data")

In [None]:
%fs ls /mnt/parquet/crime_la_data

path,name,size
dbfs:/mnt/parquet/crime_la_data/_SUCCESS,_SUCCESS,0
dbfs:/mnt/parquet/crime_la_data/_committed_2264946038564679545,_committed_2264946038564679545,4440
dbfs:/mnt/parquet/crime_la_data/_started_2264946038564679545,_started_2264946038564679545,0
dbfs:/mnt/parquet/crime_la_data/part-00000-tid-2264946038564679545-1e6f71f1-7f17-4f1c-a16a-92702d893d76-0-c000.gz.parquet,part-00000-tid-2264946038564679545-1e6f71f1-7f17-4f1c-a16a-92702d893d76-0-c000.gz.parquet,728892
dbfs:/mnt/parquet/crime_la_data/part-00001-tid-2264946038564679545-1e6f71f1-7f17-4f1c-a16a-92702d893d76-0-c000.gz.parquet,part-00001-tid-2264946038564679545-1e6f71f1-7f17-4f1c-a16a-92702d893d76-0-c000.gz.parquet,715803
dbfs:/mnt/parquet/crime_la_data/part-00002-tid-2264946038564679545-1e6f71f1-7f17-4f1c-a16a-92702d893d76-0-c000.gz.parquet,part-00002-tid-2264946038564679545-1e6f71f1-7f17-4f1c-a16a-92702d893d76-0-c000.gz.parquet,722726
dbfs:/mnt/parquet/crime_la_data/part-00003-tid-2264946038564679545-1e6f71f1-7f17-4f1c-a16a-92702d893d76-0-c000.gz.parquet,part-00003-tid-2264946038564679545-1e6f71f1-7f17-4f1c-a16a-92702d893d76-0-c000.gz.parquet,763481
dbfs:/mnt/parquet/crime_la_data/part-00004-tid-2264946038564679545-1e6f71f1-7f17-4f1c-a16a-92702d893d76-0-c000.gz.parquet,part-00004-tid-2264946038564679545-1e6f71f1-7f17-4f1c-a16a-92702d893d76-0-c000.gz.parquet,780622
dbfs:/mnt/parquet/crime_la_data/part-00005-tid-2264946038564679545-1e6f71f1-7f17-4f1c-a16a-92702d893d76-0-c000.gz.parquet,part-00005-tid-2264946038564679545-1e6f71f1-7f17-4f1c-a16a-92702d893d76-0-c000.gz.parquet,773886
dbfs:/mnt/parquet/crime_la_data/part-00006-tid-2264946038564679545-1e6f71f1-7f17-4f1c-a16a-92702d893d76-0-c000.gz.parquet,part-00006-tid-2264946038564679545-1e6f71f1-7f17-4f1c-a16a-92702d893d76-0-c000.gz.parquet,723642


In [None]:
%sql 
CREATE TEMPORARY TABLE la_crimes
USING PARQUET
OPTIONS (path "/mnt/parquet/crime_la_data")

Notice our schema is preserved in parquet files as is when the table is created from it.

In [None]:
%sql describe la_crimes

col_name,data_type,comment
DRNumber,int,
TimeOccurred,int,
AreaID,int,
AreaName,string,
ReportingDistrict,int,
CrimeCode,int,
CrimeCodeDescription,string,
MOCodes,string,
VictimAge,int,
VictimSex,string,


### Q1: Find out all crimes related to BURGLARY

In [None]:
%sql select * from la_crimes where CrimeCode = 310 limit 100

### Q2: Which LA area has the highest number of crimes?

In [None]:
%sql 
select AreaID, count(1) as count
from la_crimes 
group by AreaID 
order by count desc

AreaID,count
12,110324
3,101989
15,86177
14,83526
18,83370
19,80098
11,76469
9,75347
13,74206
17,73874


### Q3: What gender is the victim of most crimes?

In [None]:
%sql select VictimSex, count(1) as count
from la_crimes
group by VictimSex
order by count desc

VictimSex,count
M,737912
F,673832
,144865
X,23920
H,53
-,1


#Cleanup

In [None]:
%sql DROP TABLE la_crimes

In [None]:
dbutils.fs.rm("/mnt/parquet/crime_la_data", True)