# PySpark by Example

## Installing dependencies and updating

In [None]:
!apt-get update
!apt-get upgrade
!pip install pyspark
!apt-get autoremove

## Initializing Spark context

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp,col,lit

spark = SparkSession \
.builder \
.appName("PySpark by Example") \
.getOrCreate()

## Getting the data

In [None]:
#!wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD

## Reading the data

In [None]:
df = spark.read.csv('reported-crimes.csv', header = True).withColumn('Date', to_timestamp(col('Date'), 'MM/dd/yyyy hh:mm:ss a'))

In [None]:
#df.filter(df.Arrest == 1)

## Reading auto infered data schema

In [None]:
df.printSchema()

## Importing SQL types from PySpark

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BooleanType, DoubleType, IntegerType

## Creating a schema for dataset

In [None]:
labels = [
    ('ID', StringType()),
    ('Case Number', StringType()),
    ('Date', TimestampType()),
    ('Block', StringType()),
    ('IUCR', StringType()),
    ('Primary Type', StringType()),
    ('Description', StringType()),
    ('Location Description', StringType()),
    ('Arrest', StringType()),
    ('Domestic', BooleanType()),
    ('Beat', StringType()),
    ('District', StringType()),
    ('Ward', StringType()),
    ('Community Area', StringType()),
    ('FBI Code', StringType()),
    ('X Coordinate', StringType()),
    ('Y Coordinate', StringType()),
    ('Year', IntegerType()),
    ('Updated On', StringType()),
    ('Latitude', DoubleType()),
    ('Longitude', DoubleType()),
    ('Location', StringType())
    
]

In [None]:
schema = StructType([StructField (x[0], x[1], True) for x in labels])
schema

## Reading the data using the previously schema

In [None]:
df = spark.read.csv('reported-crimes.csv', schema=schema, header = True).withColumn('Date', to_timestamp(col('Date'), 'MM/dd/yyyy hh:mm:ss a'))

In [None]:
df.printSchema()

## Manipulation rows

In [None]:
one_day = spark.read.csv('reported-crimes.csv', header = True).withColumn('Date', to_timestamp(col('Date'), 'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') == lit('2018-11-12'))

## Working with dates

In [None]:
#from pyspark.sql import functions
from pyspark.sql.functions import to_date, to_timestamp, lit

In [None]:
df = spark.createDataFrame([('2019-12-25 13:30:00',)],['Christmas'])

In [None]:
df.show()

In [None]:
df.select(to_date(col('Christmas'),'yyyy-MM-dd HH:mm:ss'), to_timestamp(col('Christmas'),'yyyy-MM-dd HH:mm:ss')).show()
