# **Welcome to the PySpark World !**



---





> ## **Why use Spark?**



*   Load the data to the disk
*   Import the data into the machine's memory
*   Process/analyze the data
*   Build the machine learning model
*   Store the prediction back to disk


> ## **How Does Spark work?**


*   Python
*   Java
*   Scala
*   SQL

<font color="blue"> Most importantly, Spark is like a biult-in library, including packages used in data analysis, machine learning. It is also designed to work with Hadoop clusters and can read types of files, such as CSV, JSON, and Hive data. </font>


> ## **Let's begin !**


In this totorial, you will learn:


**1.   Install PySpark on Colab**

**2.   Basic operation with PySpark**


  *   Load the dataset (CPDB)
  *   Data preprocessing
  *   Statistics
  *   Basic Visualization
  
**3.   Play with the code**


In [3]:
# innstall java
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
# !wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
# !tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

http://apache.mirror.amaze.com.au/spark/spark-2.4.7/

In [None]:
# !gdown --id 1ahn0b2myXJ2eSoJu3z9BatW2T2QLGzvI

Downloading...
From: https://drive.google.com/uc?id=1ahn0b2myXJ2eSoJu3z9BatW2T2QLGzvI
To: /content/cpdb.sql
1.54GB [00:25, 61.3MB/s]


In [4]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [5]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import * 
import pyspark.sql.functions as F
from pyspark.sql.functions import col, asc,desc
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from pyspark.sql import SQLContext
from pyspark.mllib.stat import Statistics
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler,StandardScaler
from pyspark.ml import Pipeline
from sklearn.metrics import confusion_matrix

In [None]:
# with open("cpdb.sql") as fr:
#    query = fr.read()
# results = sqlContext.sql(query)

In [6]:
sc=spark.sparkContext
sqlContext=SQLContext(sc)

In [7]:
# import the datasets
df=spark.read \
 .option("header","True")\
 .option("inferSchema","True")\
 .option("sep",",")\
 .csv("cpdb_public_data_officer.csv")
print("There are",df.count(),"rows",len(df.columns),
      "columns" ,"in the data.")

There are 33839 rows 33 columns in the data.


In [8]:
df.show(5)

+-----+------+-----+--------------+---------+-------+----------+----------+---------+----+--------------+-----------+----------------+--------------------+---------------+------------------------------+----------------------------+------------------------------+--------------+----------------+---------------+-------------------------+-------------+--------------+----------------+-----------------------+------------+-----------------+---------+-----------------+---------------+--------------------+--------------------+
|   id|gender| race|appointed_date|     rank| active|birth_year|first_name|last_name|tags|middle_initial|suffix_name|resignation_date|complaint_percentile|middle_initial2|civilian_allegation_percentile|honorable_mention_percentile|internal_allegation_percentile|trr_percentile|allegation_count|sustained_count|civilian_compliment_count|current_badge|current_salary|discipline_count|honorable_mention_count|last_unit_id|major_award_count|trr_count|unsustained_count|has_unique_

In [9]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- race: string (nullable = true)
 |-- appointed_date: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- active: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- middle_initial: string (nullable = true)
 |-- suffix_name: string (nullable = true)
 |-- resignation_date: string (nullable = true)
 |-- complaint_percentile: double (nullable = true)
 |-- middle_initial2: string (nullable = true)
 |-- civilian_allegation_percentile: double (nullable = true)
 |-- honorable_mention_percentile: double (nullable = true)
 |-- internal_allegation_percentile: double (nullable = true)
 |-- trr_percentile: double (nullable = true)
 |-- allegation_count: integer (nullable = true)
 |-- sustained_count: integer (nullable = true)
 |-- civilian_compliment_count: integer (

In [17]:
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int' and t[0] != 'id']
df.select(numeric_features).describe().toPandas().transpose()       #toPandas here ###

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
birth_year,32897,1956.50983372344,18.103345669129965,1915,1996
allegation_count,33839,7.381660214545347,10.953876319217938,0,175
sustained_count,33839,0.649841898401253,1.3900380229501774,0,28
civilian_compliment_count,33839,1.926534472058867,2.8720954212558696,0,69
current_badge,15364,11023.83135902109,6576.065097545669,1,60090
current_salary,19810,86467.81539626452,16967.48397171634,36984,260004
discipline_count,33839,0.5598569697686102,1.2636220312945967,0,28
honorable_mention_count,33839,12.061172020449776,23.39852996201198,0,337
last_unit_id,29918,60.92452704057758,74.23352673621002,2,258


In [18]:
df.groupby('race').count().show()

+--------------------+-----+
|                race|count|
+--------------------+-----+
|       Asian/Pacific|  544|
|             Unknown|  185|
|               White|20722|
|Native American/A...|   67|
|            Hispanic| 4599|
|               Black| 7722|
+--------------------+-----+

