In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [2]:
from pyspark.sql import types
from pyspark.sql import functions as F

In [3]:
import os
import pandas as pd
import pendulum as pdl

In [4]:
from city_vars import dict_cities

# inputs
- city
- fname

In [5]:
city = 'San Francisco'
fname = ''

In [6]:
# for city-specific data
cities = ['Chicago', 'San Francisco', 'Los Angeles', 'Austin']
f_cities = [c.replace(' ', '_').lower() for c in cities]

dict_city = dict_cities[city]

In [7]:
gcs_bkt = os.getenv('GCP_GCS_BUCKET')

In [8]:
jar_path = os.getenv('JAR_FILE_LOC')
creds_path = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')

conf = SparkConf() \
    .setMaster('spark://project-spark-1:7077') \
    .setAppName('proj_file_read') \
    .set("spark.jars", jar_path)

### Only if an existing one already runs:
`sc.stop()`

In [None]:
sc.stop()

In [9]:
sc = SparkContext(conf=conf)

22/11/04 12:03:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [10]:
hconf = sc._jsc.hadoopConfiguration()

hconf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hconf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hconf.set("fs.gs.auth.service.account.json.keyfile", creds_path)
hconf.set("fs.gs.auth.service.account.enable", "true")

In [10]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

***

### first, open dataset page and check data dictionary on columns

### 1-time sample download for pandas reading to infer schema for everything else:

command:
`!wget https://data.cityofchicago.org/api/views/hx8q-mf9v/rows.csv?accessType=DOWNLOAD`

note: file is `Crimes_-_2012.csv`

output:
```
--2022-10-22 08:53:42--  https://data.cityofchicago.org/api/views/hx8q-mf9v/rows.csv?accessType=DOWNLOAD
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.140.205, 52.206.68.26, 52.206.140.199
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.140.205|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘rows.csv?accessType=DOWNLOAD’

rows.csv?accessType     [          <=>       ]  75.99M  2.54MB/s    in 28s     

2022-10-22 08:54:11 (2.68 MB/s) - ‘rows.csv?accessType=DOWNLOAD’ saved [79677853]
```

In [12]:
!wget https://data.lacity.org/api/views/63jg-8b9z/rows.csv?accessType=DOWNLOAD

--2022-10-27 14:12:57--  https://data.lacity.org/api/views/63jg-8b9z/rows.csv?accessType=DOWNLOAD
Resolving data.lacity.org (data.lacity.org)... 52.206.140.205, 52.206.68.26, 52.206.140.199
Connecting to data.lacity.org (data.lacity.org)|52.206.140.205|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘rows.csv?accessType=DOWNLOAD.3’

rows.csv?accessType     [           <=>      ] 511.52M  5.12MB/s    in 99s     

2022-10-27 14:14:36 (5.16 MB/s) - ‘rows.csv?accessType=DOWNLOAD.3’ saved [536364291]



### check count here: raw csv file
command:
`!wc -l rows.csv?accessType=DOWNLOAD`

chicago1: `485854 rows.csv?accessType=DOWNLOAD`
chicago12: `336247 rows.csv?accessType=DOWNLOAD`
austin: `35098 rows.csv?accessType=DOWNLOAD.1`
los angeles: `2119798 rows.csv?accessType=DOWNLOAD.3`
san francisco: `2129526 rows.csv?accessType=DOWNLOAD.2`

In [1]:
!wc -l rows.csv?accessType=DOWNLOAD.4

657573 rows.csv?accessType=DOWNLOAD.4


In [11]:
df_pd = pd.read_csv('rows.csv?accessType=DOWNLOAD.4', nrows=1000)
df_pd.columns

Index(['Incident Datetime', 'Incident Date', 'Incident Time', 'Incident Year',
       'Incident Day of Week', 'Report Datetime', 'Row ID', 'Incident ID',
       'Incident Number', 'CAD Number', 'Report Type Code',
       'Report Type Description', 'Filed Online', 'Incident Code',
       'Incident Category', 'Incident Subcategory', 'Incident Description',
       'Resolution', 'Intersection', 'CNN', 'Police District',
       'Analysis Neighborhood', 'Supervisor District', 'Latitude', 'Longitude',
       'Point', 'Neighborhoods', 'ESNCAG - Boundary File',
       'Central Market/Tenderloin Boundary Polygon - Updated',
       'Civic Center Harm Reduction Project Boundary',
       'HSOC Zones as of 2018-06-05', 'Invest In Neighborhoods (IIN) Areas',
       'Current Supervisor Districts', 'Current Police Districts'],
      dtype='object')

### see sample of data
Command: `df_pd`

In [12]:
pd.set_option('display.max_columns', None)
df_pd

Unnamed: 0,Incident Datetime,Incident Date,Incident Time,Incident Year,Incident Day of Week,Report Datetime,Row ID,Incident ID,Incident Number,CAD Number,Report Type Code,Report Type Description,Filed Online,Incident Code,Incident Category,Incident Subcategory,Incident Description,Resolution,Intersection,CNN,Police District,Analysis Neighborhood,Supervisor District,Latitude,Longitude,Point,Neighborhoods,ESNCAG - Boundary File,Central Market/Tenderloin Boundary Polygon - Updated,Civic Center Harm Reduction Project Boundary,HSOC Zones as of 2018-06-05,Invest In Neighborhoods (IIN) Areas,Current Supervisor Districts,Current Police Districts
0,2021/07/25 12:00:00 AM,2021/07/25,00:00,2021,Sunday,2021/07/25 01:41:00 PM,105718906372,1057189,216105573,,II,Coplogic Initial,True,6372,Larceny Theft,Larceny Theft - Other,"Theft, Other Property, $50-$200",Open or Active,,,Southern,,,,,,,,,,,,,
1,2022/06/28 11:58:00 PM,2022/06/28,23:58,2022,Tuesday,2022/06/28 11:58:00 PM,116554371012,1165543,220264913,,VS,Vehicle Supplement,,71012,Other Offenses,Other Offenses,"License Plate, Recovered",Open or Active,,,Out of SF,,,,,,,,,,,,,
2,2022/03/11 10:30:00 AM,2022/03/11,10:30,2022,Friday,2022/03/11 08:03:00 PM,113048071000,1130480,226040232,,II,Coplogic Initial,True,71000,Lost Property,Lost Property,Lost Property,Open or Active,,,Central,,,,,,,,,,,,,
3,2021/05/15 05:47:00 PM,2021/05/15,17:47,2021,Saturday,2021/05/15 05:47:00 PM,103051807043,1030518,210183345,,VS,Vehicle Supplement,,7043,Recovered Vehicle,Recovered Vehicle,"Vehicle, Recovered, Motorcycle",Open or Active,,,Out of SF,,,,,,,,,,,,,
4,2022/06/28 05:22:00 PM,2022/06/28,17:22,2022,Tuesday,2022/06/28 05:22:00 PM,116535107041,1165351,220361741,,VS,Vehicle Supplement,,7041,Recovered Vehicle,Recovered Vehicle,"Vehicle, Recovered, Auto",Open or Active,,,Out of SF,,,,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,2021/08/04 04:30:00 PM,2021/08/04,16:30,2021,Wednesday,2021/08/05 09:23:00 AM,105728307021,1057283,210496063,212170912.0,VI,Vehicle Initial,,7021,Motor Vehicle Theft,Motor Vehicle Theft,"Vehicle, Stolen, Auto",Open or Active,REVERE AVE \ CRISP RD,20076000.0,Bayview,Bayview Hunters Point,10.0,37.727217,-122.382320,POINT (-122.38232027891799 37.72721672799317),78.0,,,,,,9.0,2.0
996,2021/08/04 06:00:00 PM,2021/08/04,18:00,2021,Wednesday,2021/08/05 07:48:00 AM,105730705153,1057307,210495974,212170634.0,II,Initial,,5153,Burglary,Burglary - Other,"Burglary, Non-residential, Unlawful Entry",Open or Active,CHESTNUT ST \ PIERCE ST,26964000.0,Northern,Marina,2.0,37.800403,-122.439504,POINT (-122.43950417286405 37.80040342900427),17.0,,,,,,6.0,4.0
997,2021/08/05 03:00:00 AM,2021/08/05,03:00,2021,Thursday,2021/08/05 03:43:00 PM,105739364070,1057393,210497005,212171871.0,II,Initial,,64070,Suspicious Occ,Suspicious Occ,Suspicious Occurrence,Open or Active,STOCKTON ST \ CAMPTON PL,24907000.0,Central,Financial District/South Beach,3.0,37.788986,-122.406865,POINT (-122.40686503276359 37.788986292481376),19.0,,,,,,3.0,6.0
998,2021/04/25 07:50:00 AM,2021/04/25,07:50,2021,Sunday,2021/04/26 01:38:00 PM,103098428150,1030984,216054986,,II,Coplogic Initial,True,28150,Malicious Mischief,Vandalism,"Malicious Mischief, Vandalism to Property",Open or Active,,,Southern,,,,,,,,,,,,,


In [21]:
cols = ['Incident Datetime', 'Incident Date', 'Incident Time', 'Incident Year',
'Incident Day of Week', 'Report Datetime', 'Row ID', 'Incident ID',
'Incident Number', 'CAD Number', 'Report Type Code',
'Report Type Description', 'Incident Code',
'Incident Category', 'Incident Subcategory', 'Incident Description',
'Resolution', 'CNN', 'Police District', 'Supervisor District', 'Latitude', 'Longitude',
'Neighborhoods', 'ESNCAG - Boundary File',
'Central Market/Tenderloin Boundary Polygon - Updated',
'Civic Center Harm Reduction Project Boundary',
'HSOC Zones as of 2018-06-05', 'Invest In Neighborhoods (IIN) Areas',
'Current Supervisor Districts', 'Current Police Districts']
df_pd = pd.read_csv('rows.csv?accessType=DOWNLOAD.4', nrows=1000, usecols=cols)

### initial commands for all (1 cell each)
Commands:
```
df_pd = pd.read_csv('rows.csv?accessType=DOWNLOAD.1', nrows=1000)
df_pd.columns
```
get output, then:
```
spark.createDataFrame(df_pd).schema
```
if with error, then:
```
cols = <PASTE COLUMN LIST HERE, REMOVE PROBLEMATIC COL>

df_pd = pd.read_csv('<EDIT FILENAME>', nrows=1000, usecols=cols)
```

### for Chicago because of `TypeError: Can not merge type (pandas string to spark double) for 'Location Description', 'location' fields`
```
cols = ['Case Number', 'Date', 'Block', 'IUCR', 'Primary Type', 'Description', 'Arrest', 'Domestic', 'Beat', 'Ward', 'FBI Code', 'X Coordinate', 'Y Coordinate', 'Year', 'Latitude', 'Longitude']
```

### for Los Angeles because of `TypeError: Can not merge type (pandas string/double to spark double/string) for 'Cross Street', 'Weapon Desc', 'Mocodes', 'Vict Sex', 'Vict Descent'`
Commands:
```
cols = ['DR_NO', 'Date Rptd', 'DATE OCC', 'TIME OCC', 'AREA ', 'AREA NAME', 'Rpt Dist No', 'Part 1-2', 'Crm Cd', 'Crm Cd Desc', 'Vict Age', 'Premis Cd', 'Premis Desc', 'Weapon Used Cd', 'Status', 'Status Desc', 'Crm Cd 1', 'Crm Cd 2', 'Crm Cd 3', 'Crm Cd 4', 'LOCATION', 'LAT', 'LON']
```

### for Austin because of `TypeError: Can not merge type (pandas string to spark double) for 'Clearance Status', 'Clearance Date', 'GO Location' fields`
Commands:
```
cols = ['GO Primary Key', 'Council District', 'GO Highest Offense Desc', 'Highest NIBRS/UCR Offense Description', 'GO Report Date', 'GO District', 'GO Location Zip', 'GO Census Tract', 'GO X Coordinate', 'GO Y Coordinate']
```

In [22]:
spark.createDataFrame(df_pd).schema

TypeError: field Incident Category: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

### modify schema output above and removed columns, based on sample output before, then add template below

### Replace below with me:
```
df_csv = spark.read \
    .option("header", "true") \
    .schema(schema_template) \
    .csv(f'{gcs_bkt}/raw/{city}/{fname}')
```

In [61]:
df_csv = spark.read \
    .option("header", "true") \
    .schema(dict_city['schema_template']) \
    .csv(f'{gcs_bkt}/raw/{city}/' + 'Crime_Data_from_2010_to_2019.csv')

In [63]:
df_csv.head(10)

[Row(DR_NO=1307355, Date Rptd='02/20/2010 12:00:00 AM', DATE OCC='02/20/2010 12:00:00 AM', TIME OCC=1350, AREA =13, AREA NAME='Newton', Rpt Dist No=1385, Part 1-2=2, Crm Cd=900, Crm Cd Desc='VIOLATION OF COURT ORDER', Mocodes='0913 1814 2000', Vict Age=48, Vict Sex='M', Vict Descent='H', Premis Cd=501, Premis Desc='SINGLE FAMILY DWELLING', Weapon Used Cd=None, Weapon Desc=None, Status='AA', Status Desc='Adult Arrest', Crm Cd 1=900, Crm Cd 2=None, Crm Cd 3=None, Crm Cd 4=None, LOCATION='300 E  GAGE                         AV', Cross Street=None, LAT=33.98249816894531, LON=-118.26950073242188),
 Row(DR_NO=11401303, Date Rptd='09/13/2010 12:00:00 AM', DATE OCC='09/12/2010 12:00:00 AM', TIME OCC=45, AREA =14, AREA NAME='Pacific', Rpt Dist No=1485, Part 1-2=2, Crm Cd=740, Crm Cd Desc='VANDALISM - FELONY ($400 & OVER, ALL CHURCH VANDALISMS)', Mocodes='0329', Vict Age=0, Vict Sex='M', Vict Descent='W', Premis Cd=101, Premis Desc='STREET', Weapon Used Cd=None, Weapon Desc=None, Status='IC', St

In [59]:
df_csv.count()

                                                                                

2119797

### check count here: original df
Command:
`df_csv.count()`

Output:
`485853`

### inspect data
Command:
```
df_csv.head(10)
```

In [65]:
def parse_dt(dt_str):
    """
    parse datetime object from given date string of specific format
    """
    return pdl.from_format(dt_str, dict_city['date_format'])

parse_dt_udf = F.udf(parse_dt, returnType=types.TimestampType())

In [66]:
# parse datetime out of provided date column
df_time = df_csv.withColumn('Timestamp', parse_dt_udf(F.col(dict_city['date_string_col'])))

if dict_city['with_year_col']:
    years_rows = df_time \
        .select('Year')
else:
    years_rows = df_time \
        .select(F.year('Timestamp').alias('Year'))

In [67]:
df_time.head(10)

                                                                                

[Row(DR_NO=1307355, Date Rptd='02/20/2010 12:00:00 AM', DATE OCC='02/20/2010 12:00:00 AM', TIME OCC=1350, AREA =13, AREA NAME='Newton', Rpt Dist No=1385, Part 1-2=2, Crm Cd=900, Crm Cd Desc='VIOLATION OF COURT ORDER', Mocodes='0913 1814 2000', Vict Age=48, Vict Sex='M', Vict Descent='H', Premis Cd=501, Premis Desc='SINGLE FAMILY DWELLING', Weapon Used Cd=None, Weapon Desc=None, Status='AA', Status Desc='Adult Arrest', Crm Cd 1=900, Crm Cd 2=None, Crm Cd 3=None, Crm Cd 4=None, LOCATION='300 E  GAGE                         AV', Cross Street=None, LAT=33.98249816894531, LON=-118.26950073242188, Timestamp=datetime.datetime(2010, 2, 20, 0, 0)),
 Row(DR_NO=11401303, Date Rptd='09/13/2010 12:00:00 AM', DATE OCC='09/12/2010 12:00:00 AM', TIME OCC=45, AREA =14, AREA NAME='Pacific', Rpt Dist No=1485, Part 1-2=2, Crm Cd=740, Crm Cd Desc='VANDALISM - FELONY ($400 & OVER, ALL CHURCH VANDALISMS)', Mocodes='0329', Vict Age=0, Vict Sex='M', Vict Descent='W', Premis Cd=101, Premis Desc='STREET', Weapon

In [68]:
years_rows = years_rows \
    .dropna() \
    .dropDuplicates(['Year']) \
    .collect()

years = [row.Year for row in years_rows]
years.sort()

                                                                                

In [69]:
years

[2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019]

### check parsed years
Command:
`print(years)`

Output:
`[2001]`

In [70]:
df_test = df_time \
    .repartition(24) \
    .filter(F.month('Timestamp') == 1)

df_test.count()

                                                                                

183674

### check count Jan: csv df, strdate col
Command:
```
df_test = df_time \
    .filter(F.month('Timestamp') == 1)

df_test.count()
```
chicago: `38114` san_francisco: `189723` los_angeles: `183674` austin: `3098`

### check date Jan1: csv df, strdate col
Command:
```
df_test \
    .filter(F.dayofmonth('Timestamp') == 1) \
    .count()
```
chicago: `1825` san_francisco: ` ` los_angeles: ` ` austin: `97`

In [76]:
o_cols = df_time.columns
cols = [col.lower().replace(' ', '_') for col in o_cols]

for year in years:
    df = df_time.filter(F.year('Timestamp') == year)
    for month in range(1, 13):
        df_month = df.filter(F.month('Timestamp') == month)
        for i in range(len(o_cols)):
            df_month = df_month.withColumnRenamed(o_cols[i], cols[i])
        if dict_city['partitions'] > 1:
            df_month = df_month.repartition(dict_city['partitions'])
        df_month \
            .drop('Timestamp', dt_str_col[city]) \
            .write.parquet(f'{gcs_bkt}/pq/{city}/{year}/{month}', mode='overwrite')

ERROR:root:KeyboardInterrupt while sending command.                 (0 + 2) / 4]
Traceback (most recent call last):
  File "/opt/spark-3.3.0-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/spark-3.3.0-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [80]:
sc.stop()