<a href="https://colab.research.google.com/github/absabry/Pyspark-tutorial/blob/master/2.%20schemas.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Using Schemas

## Check for JAVA_HOME

In [6]:
import os
print(os.getenv("JAVA_HOME")) # check for the correct java version (should be 1.8 for using spark)

/usr/lib/jvm/java-8-openjdk-amd64/Library/Java/JavaVirtualMachines/jdk1.8.0_202.jdk/Contents/Home


In [16]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 

In [2]:
import pandas as pd

## Preprocessing Chicago's Reported Crime Data using Pandas and Spark

In [12]:
from pyspark.sql.functions import to_timestamp,col
rc = spark.read.csv('Crimes.csv',header=True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a'))

In [15]:
rc.count()

6820155

In [13]:
data_pd = pd.read_csv('Crimes.csv')

In [14]:
data_pd.shape

(6820155, 22)

#### Trying different ways of getting data from spark dataframe

In [None]:
rc.take(5)

In [None]:
rc.head(5)

In [None]:
rc.limit(5)

## Schemas

In [None]:
rc.printSchema()

In [None]:
from pyspark.sql.types import StructField, StructType, StringType, TimestampType, BooleanType, DoubleType, IntegerType

In [None]:
boolean_labels = ['Domestic']
timestamp_labels = ['Date']
double_labels = ['Latitude', 'Longitude']
int_labels = ['Year']
string_labels = list(set(rc.columns) - set(boolean_labels) - set(timestamp_labels) - set(double_labels) - set(int_labels))

In [None]:
schema = []
for x in rc.columns : 
  if x in boolean_labels: 
    schema.append(StructField(x, BooleanType(), True))
  if x in string_labels: 
    schema.append(StructField(x, StringType(), True))
  if x in int_labels: 
    schema.append(StructField(x, IntegerType(), True))
  if x in double_labels: 
    schema.append(StructField(x, DoubleType(), True))
  if x in timestamp_labels: 
    schema.append(StructField(x, TimestampType(), True))
schema = StructType(schema)

In [None]:
rc = spark.read.csv('reported-crimes.csv',schema=schema)
rc.printSchema()

In [None]:
rc.show(5) # we have null values, that means something is wrong with the schema or with data

In [None]:
rc.select('ID', 'District').show(5)

In [None]:
rc = rc.withColumnRenamed('Description', 'Desc'); # change column name

In [None]:
rc.columns

In [None]:
rc_without_ID = rc.drop('ID')

In [None]:
rc_without_ID.columns