In [1]:
# setting up spark environment
import os
# os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
# os.environ['SPARK_HOME'] = '/content/spark-2.3.1-bin-hadoop2.7'

import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark



In [2]:
# Testing out data reader
from pyspark.sql.functions import to_timestamp, col, lit
FILE_PATH = '/Users/ferdinand/Desktop/dataset/'
dataset = os.path.join(FILE_PATH,'reported-crimes.csv')

rc = spark.read.csv(dataset, header=True).withColumn('Date',to_timestamp(col('Date'),'MM-dd-yyyy hh-mm-ss aa') > lit('2018-11-12'))

In [None]:
#DATA FRAME OPERATIONS
#df.take(n)
#df.collect --> return all datasets
#df.show(n)
#df.limit(n)
#df.head(n)

In [None]:
# DATA SCHEMA
# rc.types
# df.printschema()

In [6]:
rc.columns
rc.show(2)

[('ID', 'string'),
 ('Case Number', 'string'),
 ('Date', 'boolean'),
 ('Block', 'string'),
 ('IUCR', 'string'),
 ('Primary Type', 'string'),
 ('Description', 'string'),
 ('Location Description', 'string'),
 ('Arrest', 'string'),
 ('Domestic', 'string'),
 ('Beat', 'string'),
 ('District', 'string'),
 ('Ward', 'string'),
 ('Community Area', 'string'),
 ('FBI Code', 'string'),
 ('X Coordinate', 'string'),
 ('Y Coordinate', 'string'),
 ('Year', 'string'),
 ('Updated On', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string'),
 ('Location', 'string')]

In [None]:
from pyspark.sql.types import IntegerType, StringType, StructField, StructType, TimestampType, BooleanType, DoubleType, IntegerType
# structType([
#     StructField('ID', StringType, True),
#     StructField('Case Number', StringType, True)
#     StructField('Date',TimestampType, True)
#      'Block',
#      'IUCR',
#      'Primary Type',
#      'Description',
#      'Location Description',
#      'Arrest',
#      'Domestic',
#      'Beat',
#      'District',
#      'Ward',
#      'Community Area',
#      'FBI Code',
#      'X Coordinate',
#      'Y Coordinate',
#      'Year',
#      'Updated On',
#      'Latitude',
#      'Longitude',
#      'Location'
    
# ])

In [None]:
labels = [
    ('ID',StringType()),
    ('Case Number',StringType()),
    ('Date',TimestampType()),
    ('Block',StringType()),
    ('IUCR',StringType()),
    ('Primary Type',StringType()),
    ('Description',StringType()),
    ('Location Description',StringType()),
    ('Arrest',StringType()),
    ('Domestic',BooleanType()),
    ('Beat',StringType()),
    ('District',StringType()),
    ('Ward',StringType()),
    ('Community Area',StringType()),
    ('FBI Code',StringType()),
    ('X Coordinate',StringType()),
    ('Y Coordinate',StringType()),
    ('Year',IntegerType()),
    ('Updated On',StringType()),
    ('Latitude',DoubleType()),
    ('Longitude',DoubleType()),
    ('Location',StringType())]

In [None]:
schema = StructType([StructField (x[0],x[1],True) for x in labels])
schema

In [None]:
rc = spark.read.csv(dataset, schema=schema)
rc.printSchema()

In [None]:
# COLUMN OPERATIONS
#df.withColumn('DoubleColumn',2*df['current']) --> add new column
#df.withColumnRenamed(current_name,new_name)
#df.drop('column_name')
#df.groupBy('column')
#df.select('col_name').show(n)
#df.select(rc.col_name).show(n)
#df.select(col('col_name')).show(n)
#df.select('col1','col2','col3').show(n)

In [None]:
# add constant
from pyspark.sql.functions import lit
rc.withColumn('One',lit(1)).show(2)

In [33]:
# rc = rc.drop('IUCR')
rc.select('IUCR').distinct().count()
rc.select('IUCR').distinct().show(5)

+----+
|IUCR|
+----+
|1090|
|1512|
|1572|
|2110|
|0895|
+----+
only showing top 5 rows



In [None]:
# ROW MANIPULATION
#df.filter(col('col_name') > 1)
#df.select(col).distinct().show() --> unique row
#df.orderBy(col('col_name'))--> sorting
#df.union(df2)--> appending row

In [None]:
one_day = spark.read.csv(dataset, header=True).withColumn('Date',to_timestamp(col('Date'),'MM-dd-yyyy hh:mm:ss aa') == lit('2018-11-12'))
one_day.count()

In [None]:
# rc.union(one_day).orderBy('Date',ascending=False).show(5)

In [None]:
rc.groupBy('Primary Type').count().orderBy('count', ascending=False).show(10)

In [None]:
rc.select('Arrest').distinct().show() #similar to .unique() in pd 
rc.printSchema()

In [None]:
# percentage of crime that resulted in an arrest
rc.filter(col('Arrest') == 'true').count()/rc.select('Arrest').count()
rc.groupBy('Location Description').count().orderBy('count', ascending=False).show(3)

In [None]:
#built in function
from pyspark.sql import functions
# print(dir(functions))

In [None]:
from pyspark.sql.functions import lower, upper, substring

In [None]:
help(substring)

In [None]:
rc.select(lower(col('Primary Type')), upper(col('Primary Type')), substring(col('Primary Type'),1,5)).show(5)

In [None]:
from pyspark.sql.functions import min, max

In [None]:
rc.select(min(col('Date')), max(col('Date'))).show(1)

In [None]:
from pyspark.sql.functions import date_add, date_sub
help(date_add)

In [None]:
rc.select(date_sub(min(col('Date')),3), date_add(max(col('Date')),3)).show(1)

In [None]:
rc.select('Date').show(10)

In [10]:
# DATE MANIPULATION
from pyspark.sql.functions import to_date, to_timestamp, lit
# 2019-12-25 13:30:00'
df = spark.createDataFrame([('2019-12-25 13:30:00',)], ['Christmas'])
df.show(1)




+-------------------+
|          Christmas|
+-------------------+
|2019-12-25 13:30:00|
+-------------------+



In [13]:
df.select(to_date(col('Christmas'), 'yyyy-MM-dd HH:mm:ss'), to_timestamp(col('Christmas'), 'yyyy-MM-dd HH:mm:ss')).show(1)




+-------------------------------------------+------------------------------------------------+
|to_date(`Christmas`, 'yyyy-MM-dd HH:mm:ss')|to_timestamp(`Christmas`, 'yyyy-MM-dd HH:mm:ss')|
+-------------------------------------------+------------------------------------------------+
|                                 2019-12-25|                             2019-12-25 13:30:00|
+-------------------------------------------+------------------------------------------------+



In [21]:
df = spark.createDataFrame([('25/Dec/2019 13:30:00',)], ['Christmas'])
df.select(to_date(col('Christmas'), 'dd/MM/yyyy HH:mm:ss'), to_timestamp(col('Christmas'), 'dd/MM/yyyy HH:mm:ss')).show(1)


+-------------------------------------------+------------------------------------------------+
|to_date(`Christmas`, 'dd/MM/yyyy HH:mm:ss')|to_timestamp(`Christmas`, 'dd/MM/yyyy HH:mm:ss')|
+-------------------------------------------+------------------------------------------------+
|                                       null|                                            null|
+-------------------------------------------+------------------------------------------------+



In [29]:
nrc = spark.read.csv(dataset, header='True')
nrc.show(2, truncate=True)



+--------+-----------+--------------------+------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|                Date|             Block|IUCR|       Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+--------------------+------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|11034701|   JA366925|01/01/2001 11:00:...|   016XX E 86TH PL|1153| DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|           RESIDENCE| false|   false|0412|     004|   8|            45|      11|     

In [35]:
#Data manipulation
from pyspark.sql.functions import lpad
help(lpad)
#JOIN


Help on function lpad in module pyspark.sql.functions:

lpad(col, len, pad)
    Left-pad the string column to width `len` with `pad`.
    
    >>> df = spark.createDataFrame([('abcd',)], ['s',])
    >>> df.select(lpad(df.s, 6, '#').alias('s')).collect()
    [Row(s='##abcd')]
    
    .. versionadded:: 1.5



In [None]:
rc.join(ps, rc.col == ps.col, 'left-outer').show(5, truncate=False)
nc = rc.filter( (col('Primary Type') == 'NON - CRIMINAL') | (col('Primary Type') == 'NON-CRIMINAL') | ((col('Primary Type') == 'NON-CRIMINAL(SUB)')) )
nc.groupBy(col('Description')).count(orderBy('count', ascending=False).show(5,Truncate=False))




from pyspark.sql.functions import date_format, dayofweek

# get day of the week
rc.select(col('Date'),dayofweek(col('Date')), date_format(col('Date'),'E')).show(5)
rc.groupBy(date_format(col('Date'),'E')).count().orderBy('count', ascending=False).show()


In [None]:
# use pandas to chart to access it, use .collect()
rc.groupBy(date_format(col('Date'),'E')).count().collec()
dow = [x[0] for x in rc.groupBy(date_format(col('Date')).count().collect())]
cnt = [x[1] for x in rc.groupBy(date_format(col('Date')).count().collect())]

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

cp = pd.DataFrame({'Day_of_week':dow, 'Count':cnt})
cp.head()
cp.sort_values('Count', ascending=False).plot(kind='bar', color='olive' x='Day_of_week', y='Count')
plt.xlabel('Day of the week')
plt.ylabel('number of reported crimes')
plt.title('No. of reported crimes per day of the week from 2001')





In [None]:
# WHEN TO USE RDD
