<a href="https://colab.research.google.com/github/alvarofernandezmalagon/Basic_Operations_PySpark/blob/master/basic_operations_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Objective




The objective of this notebook is to review the basic functions of pyspark 

# Create the environment of PySpark

In [0]:
#We have to install Spark and Java in Colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
#It is time to set the environment path that enables us to run PySpark in our Colab environment
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
#We can run a local spark session to test our installation:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
spark

# Get data

### Get access to Google Drive


In [0]:
from google.colab import drive

In [6]:
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


### Change the directory

In [7]:
%cd /content/drive/My Drive/02-Colab/01-PySpark/00-Repaso_Pyspark/01-Script

/content/drive/My Drive/02-Colab/01-PySpark/00-Repaso_Pyspark/01-Script


###Unzip the data

In [8]:
!ls ../00-Data

fire-incidents.csv  fire-incidents.csv.zip


In [9]:
!unzip ../00-Data/fire-incidents.csv.zip -d ../00-Data/

Archive:  ../00-Data/fire-incidents.csv.zip
  inflating: ../00-Data/fire-incidents.csv  


In [10]:
!ls -l ../00-Data

total 236153
-rw------- 1 root root 207085551 Dec  6 12:15 fire-incidents.csv
-rw------- 1 root root  34734540 May  5 15:36 fire-incidents.csv.zip


#Import libraries

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

#Create the path

In [12]:
filename = 'fire-incidents.csv'
data_path = '/content/drive/My Drive/02-Colab/01-PySpark/00-Repaso_Pyspark/00-Data/'+filename
print(data_path)

/content/drive/My Drive/02-Colab/01-PySpark/00-Repaso_Pyspark/00-Data/fire-incidents.csv


#Load data in memory

In [0]:
fire_df = spark.read.csv(data_path,
                         header=True,
                         inferSchema=True,
                         sep=",")

In [14]:
fire_df.show(5,truncate=False)

+---------------+---------------+----------------------+-------------------+-----------+-------------------+-------------------+-------------------+-------------+-------+---------+------------+----+-----------------+---------------------+---------+-------------+-----------+---------------+-------------------+-----------------------+-----------------------+---------------+-------------+-------------------+-----------------+----------------+----------------------------------------------------+----------+--------------------+----------------------+------------------+--------------------------+---------------------------------------------+-------------------+--------------+-----------------------+-------------------------+-----------+------------------+--------------------------------------+--------------+----------------+--------------------+-----------+--------------+------------------------------------+----------------------------------------+----------------------------------+---------

###Get the columns and types

In [15]:
fire_df.printSchema()

root
 |-- Incident Number: integer (nullable = true)
 |-- Exposure Number: integer (nullable = true)
 |-- Address: string (nullable = true)
 |-- Incident Date: timestamp (nullable = true)
 |-- Call Number: integer (nullable = true)
 |-- Alarm DtTm: timestamp (nullable = true)
 |-- Arrival DtTm: timestamp (nullable = true)
 |-- Close DtTm: timestamp (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- Station Area: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- Suppression Units: integer (nullable = true)
 |-- Suppression Personnel: integer (nullable = true)
 |-- EMS Units: integer (nullable = true)
 |-- EMS Personnel: integer (nullable = true)
 |-- Other Units: integer (nullable = true)
 |-- Other Personnel: integer (nullable = true)
 |-- First Unit On Scene: string (nullable = true)
 |-- Estimated Property Loss: integer (nullable = true)
 |-- Estimated Contents Loss: double (nullab

### Get the structure

#### Get the number of columns and rows

In [16]:
#Number of rows
fire_df.count()

513405

In [17]:
#Number of columns
len(fire_df.columns)

63

#Basic PySpark operations

##Change the name of the columns

In [18]:
for column in fire_df.columns:
  new_column=column.lower().replace(" ","_")
  fire_df=fire_df.withColumnRenamed(column,new_column)
fire_df.columns

['incident_number',
 'exposure_number',
 'address',
 'incident_date',
 'call_number',
 'alarm_dttm',
 'arrival_dttm',
 'close_dttm',
 'city',
 'zipcode',
 'battalion',
 'station_area',
 'box',
 'suppression_units',
 'suppression_personnel',
 'ems_units',
 'ems_personnel',
 'other_units',
 'other_personnel',
 'first_unit_on_scene',
 'estimated_property_loss',
 'estimated_contents_loss',
 'fire_fatalities',
 'fire_injuries',
 'civilian_fatalities',
 'civilian_injuries',
 'number_of_alarms',
 'primary_situation',
 'mutual_aid',
 'action_taken_primary',
 'action_taken_secondary',
 'action_taken_other',
 'detector_alerted_occupants',
 'property_use',
 'area_of_fire_origin',
 'ignition_cause',
 'ignition_factor_primary',
 'ignition_factor_secondary',
 'heat_source',
 'item_first_ignited',
 'human_factors_associated_with_ignition',
 'structure_type',
 'structure_status',
 'floor_of_fire_origin',
 'fire_spread',
 'no_flame_spead',
 'number_of_floors_with_minimum_damage',
 'number_of_floors_wit

##Select the columns

###One way

In [19]:
#Only the names of the first 5 columns
fire_df.columns[:5]

['incident_number',
 'exposure_number',
 'address',
 'incident_date',
 'call_number']

In [20]:
#Select the name of the first 3 columns with data
fire_df.select(fire_df.columns[:3]).show(5)

+---------------+---------------+--------------------+
|incident_number|exposure_number|             address|
+---------------+---------------+--------------------+
|       19146215|              0|925 Golden Gate A...|
|       19146165|              0| 619 Holloway Avenue|
|       19146202|              0|    1485 Pine Street|
|       19146182|              0|       Church Street|
|       19146193|              0|   1430 Scott Street|
+---------------+---------------+--------------------+
only showing top 5 rows



In [21]:
#Other way
fire_df.select('incident_number','exposure_number').show(5)

+---------------+---------------+
|incident_number|exposure_number|
+---------------+---------------+
|       19146215|              0|
|       19146165|              0|
|       19146202|              0|
|       19146182|              0|
|       19146193|              0|
+---------------+---------------+
only showing top 5 rows



###Using SQL

In [22]:
F.col('incident_number')

Column<b'incident_number'>

##Filter and selecting columns

In [23]:
fire_df.filter(F.col("call_number")>10030109).select("call_number").show(5)

+-----------+
|call_number|
+-----------+
|  193390376|
|  193390050|
|  193390291|
|  193390165|
|  193390205|
+-----------+
only showing top 5 rows

