In [3]:
# # !pip install pyspark

In [4]:
from pyspark.sql import SparkSession
import pandas as pd

In [5]:
spark=SparkSession.builder.appName('Practise').getOrCreate()
spark

# Load data - read.csv

In [6]:
# df_pyspark=spark.read.csv('datasets/cal_housing.csv')
# we should use inferSchema=True to get correct types, or all types will be string
# df_pyspark=spark.read.option('header','true').csv('datasets/cal_housing.csv',inferSchema=True)
# or
df_pyspark=spark.read.csv('datasets/cal_housing.csv',header=True,inferSchema=True)
df_pyspark.show(5)

+-----------+---------+---------+----------+----------+----------+----------+--------+-------------+
|-122.230000|37.880000|41.000000|880.000000|129.000000|322.000000|126.000000|8.325200|452600.000000|
+-----------+---------+---------+----------+----------+----------+----------+--------+-------------+
|    -122.22|    37.86|     21.0|    7099.0|    1106.0|    2401.0|    1138.0|  8.3014|     358500.0|
|    -122.24|    37.85|     52.0|    1467.0|     190.0|     496.0|     177.0|  7.2574|     352100.0|
|    -122.25|    37.85|     52.0|    1274.0|     235.0|     558.0|     219.0|  5.6431|     341300.0|
|    -122.25|    37.85|     52.0|    1627.0|     280.0|     565.0|     259.0|  3.8462|     342200.0|
|    -122.25|    37.85|     52.0|     919.0|     213.0|     413.0|     193.0|  4.0368|     269700.0|
+-----------+---------+---------+----------+----------+----------+----------+--------+-------------+
only showing top 5 rows



# set columns names

In [7]:
df=df_pyspark.toDF('longitude','latitude','housingMedianAge','totalRooms','totalBedrooms','population','households',
                   'medianIncome','medianHouseValue' )

In [8]:
df.show(3)

+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+
|longitude|latitude|housingMedianAge|totalRooms|totalBedrooms|population|households|medianIncome|medianHouseValue|
+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+
|  -122.22|   37.86|            21.0|    7099.0|       1106.0|    2401.0|    1138.0|      8.3014|        358500.0|
|  -122.24|   37.85|            52.0|    1467.0|        190.0|     496.0|     177.0|      7.2574|        352100.0|
|  -122.25|   37.85|            52.0|    1274.0|        235.0|     558.0|     219.0|      5.6431|        341300.0|
+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+
only showing top 3 rows



In [9]:
df.head(3)

[Row(longitude=-122.22, latitude=37.86, housingMedianAge=21.0, totalRooms=7099.0, totalBedrooms=1106.0, population=2401.0, households=1138.0, medianIncome=8.3014, medianHouseValue=358500.0),
 Row(longitude=-122.24, latitude=37.85, housingMedianAge=52.0, totalRooms=1467.0, totalBedrooms=190.0, population=496.0, households=177.0, medianIncome=7.2574, medianHouseValue=352100.0),
 Row(longitude=-122.25, latitude=37.85, housingMedianAge=52.0, totalRooms=1274.0, totalBedrooms=235.0, population=558.0, households=219.0, medianIncome=5.6431, medianHouseValue=341300.0)]

In [10]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

# To check the data types of the columns (Schema)

In [11]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housingMedianAge: double (nullable = true)
 |-- totalRooms: double (nullable = true)
 |-- totalBedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- medianIncome: double (nullable = true)
 |-- medianHouseValue: double (nullable = true)



In [12]:
df.columns

['longitude',
 'latitude',
 'housingMedianAge',
 'totalRooms',
 'totalBedrooms',
 'population',
 'households',
 'medianIncome',
 'medianHouseValue']

# Select few columns to display

In [13]:
df.select(['medianIncome','medianHouseValue']).show(3)

+------------+----------------+
|medianIncome|medianHouseValue|
+------------+----------------+
|      8.3014|        358500.0|
|      7.2574|        352100.0|
|      5.6431|        341300.0|
+------------+----------------+
only showing top 3 rows



In [14]:
df.describe().show()

+-------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|          longitude|          latitude|  housingMedianAge|        totalRooms|     totalBedrooms|        population|        households|      medianIncome|  medianHouseValue|
+-------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|              20639|             20639|             20639|             20639|             20639|             20639|             20639|             20639|             20639|
|   mean|-119.56957556082855|35.631752507389095|28.638887543001115|2635.8481515577305| 537.9178254760405|1425.5302097969864|499.55777896215903|3.8704551722467655|206843.91012161443|
| stddev|  2.003494680133486|2.1359468160147936|12.585568404719403|2181.6338702978455|421.

# Adding new column

In [15]:
# Note: this is not in place operation we need to assign it to a variable
df=df.withColumn('population After 2 yesrs',df['population']+1000)
df.show(3)

+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+------------------------+
|longitude|latitude|housingMedianAge|totalRooms|totalBedrooms|population|households|medianIncome|medianHouseValue|population After 2 yesrs|
+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+------------------------+
|  -122.22|   37.86|            21.0|    7099.0|       1106.0|    2401.0|    1138.0|      8.3014|        358500.0|                  3401.0|
|  -122.24|   37.85|            52.0|    1467.0|        190.0|     496.0|     177.0|      7.2574|        352100.0|                  1496.0|
|  -122.25|   37.85|            52.0|    1274.0|        235.0|     558.0|     219.0|      5.6431|        341300.0|                  1558.0|
+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+------------------------+
only showing top 3 r

# Drop columns

In [16]:
df=df.drop('population After 2 yesrs','longitude','latitude')
df.show(3)

+----------------+----------+-------------+----------+----------+------------+----------------+
|housingMedianAge|totalRooms|totalBedrooms|population|households|medianIncome|medianHouseValue|
+----------------+----------+-------------+----------+----------+------------+----------------+
|            21.0|    7099.0|       1106.0|    2401.0|    1138.0|      8.3014|        358500.0|
|            52.0|    1467.0|        190.0|     496.0|     177.0|      7.2574|        352100.0|
|            52.0|    1274.0|        235.0|     558.0|     219.0|      5.6431|        341300.0|
+----------------+----------+-------------+----------+----------+------------+----------------+
only showing top 3 rows



# Rename the columns

In [17]:
df=df.withColumnRenamed('medianHouseValue','Median_House_Value')
df.show(3)

+----------------+----------+-------------+----------+----------+------------+------------------+
|housingMedianAge|totalRooms|totalBedrooms|population|households|medianIncome|Median_House_Value|
+----------------+----------+-------------+----------+----------+------------+------------------+
|            21.0|    7099.0|       1106.0|    2401.0|    1138.0|      8.3014|          358500.0|
|            52.0|    1467.0|        190.0|     496.0|     177.0|      7.2574|          352100.0|
|            52.0|    1274.0|        235.0|     558.0|     219.0|      5.6431|          341300.0|
+----------------+----------+-------------+----------+----------+------------+------------------+
only showing top 3 rows



In [18]:
# Drop all null values
df.na.drop();

In [19]:
# Drop using 'how'
# all will drop if all row is null, and any will drop if any is null 
df.na.drop(how='all');

In [20]:
# thresh=2 will keep the row if there is two not null values in the row  
df.na.drop(how='all',thresh=2);

In [21]:
# subset will delete any rows if the value in spicefied column is null 
df.na.drop(how='all',subset=['Median_House_Value']);

In [22]:
# filling the missing value, this will fill the missing value with 0 
df.na.fill(0,['totalRooms','totalBedrooms']);

In [23]:
# fill using Imputer function, will fill missing values with mean
from pyspark.ml.feature import Imputer

imputer=Imputer(
inputCols=['totalRooms','totalBedrooms'],
    outputCols=["{}_imputed".format(c) for c in ['totalRooms','totalBedrooms']],
    ).setStrategy("mean")

In [24]:
# Add imputation cols to df
imputer.fit(df).transform(df).show(3)

+----------------+----------+-------------+----------+----------+------------+------------------+------------------+---------------------+
|housingMedianAge|totalRooms|totalBedrooms|population|households|medianIncome|Median_House_Value|totalRooms_imputed|totalBedrooms_imputed|
+----------------+----------+-------------+----------+----------+------------+------------------+------------------+---------------------+
|            21.0|    7099.0|       1106.0|    2401.0|    1138.0|      8.3014|          358500.0|            7099.0|               1106.0|
|            52.0|    1467.0|        190.0|     496.0|     177.0|      7.2574|          352100.0|            1467.0|                190.0|
|            52.0|    1274.0|        235.0|     558.0|     219.0|      5.6431|          341300.0|            1274.0|                235.0|
+----------------+----------+-------------+----------+----------+------------+------------------+------------------+---------------------+
only showing top 3 rows



# Filter Operations

In [25]:
# Median_House_Value grater than or equl to 500,000
df.filter('Median_House_Value>=500000').show()

+----------------+----------+-------------+----------+----------+------------+------------------+
|housingMedianAge|totalRooms|totalBedrooms|population|households|medianIncome|Median_House_Value|
+----------------+----------+-------------+----------+----------+------------+------------------+
|            52.0|     249.0|         78.0|     396.0|      85.0|      1.2434|          500001.0|
|            52.0|     609.0|        236.0|    1349.0|     250.0|      1.1696|          500001.0|
|            52.0|    1668.0|        225.0|     517.0|     214.0|      7.8521|          500001.0|
|            52.0|    3726.0|        474.0|    1366.0|     496.0|      9.3959|          500001.0|
|            52.0|    2990.0|        379.0|     947.0|     361.0|      7.8772|          500001.0|
|            39.0|    2492.0|        310.0|     808.0|     315.0|     11.8603|          500001.0|
|            42.0|    2991.0|        335.0|    1018.0|     335.0|      13.499|          500001.0|
|            52.0|  

In [26]:
# selecting few columns
df.filter(df['Median_House_Value']<=30000).select(['medianIncome','Median_House_Value']).show()

+------------+------------------+
|medianIncome|Median_House_Value|
+------------+------------------+
|       2.675|           22500.0|
|      1.6607|           14999.0|
|      1.2132|           30000.0|
|      0.8571|           25000.0|
|         2.1|           14999.0|
|      2.6389|           30000.0|
|      2.3013|           26600.0|
|      2.1955|           26900.0|
|      2.3667|           17500.0|
|      4.1932|           14999.0|
|      1.2656|           27500.0|
|      2.7138|           22500.0|
|      1.0918|           22500.0|
|      0.7917|           22500.0|
|      2.7377|           28300.0|
|       0.536|           14999.0|
+------------+------------------+



In [27]:
# multiple conditions
df.filter((df['Median_House_Value']>=500000) & 
          (df['totalBedrooms']<=5)).show()

+----------------+----------+-------------+----------+----------+------------+------------------+
|housingMedianAge|totalRooms|totalBedrooms|population|households|medianIncome|Median_House_Value|
+----------------+----------+-------------+----------+----------+------------+------------------+
|            52.0|       8.0|          1.0|      13.0|       1.0|     15.0001|          500001.0|
|            46.0|      30.0|          4.0|      13.0|       5.0|     15.0001|          500001.0|
+----------------+----------+-------------+----------+----------+------------+------------------+



In [28]:
# (~) inverse operation (not), this will show anything not matching our conditions
df.filter(~(df['Median_House_Value']>=500000)).show()

+----------------+----------+-------------+----------+----------+------------+------------------+
|housingMedianAge|totalRooms|totalBedrooms|population|households|medianIncome|Median_House_Value|
+----------------+----------+-------------+----------+----------+------------+------------------+
|            21.0|    7099.0|       1106.0|    2401.0|    1138.0|      8.3014|          358500.0|
|            52.0|    1467.0|        190.0|     496.0|     177.0|      7.2574|          352100.0|
|            52.0|    1274.0|        235.0|     558.0|     219.0|      5.6431|          341300.0|
|            52.0|    1627.0|        280.0|     565.0|     259.0|      3.8462|          342200.0|
|            52.0|     919.0|        213.0|     413.0|     193.0|      4.0368|          269700.0|
|            52.0|    2535.0|        489.0|    1094.0|     514.0|      3.6591|          299200.0|
|            52.0|    3104.0|        687.0|    1157.0|     647.0|        3.12|          241400.0|
|            42.0|  

# Group by

In [36]:
df.groupby('housingMedianAge').sum('Median_House_Value').show(3)

+----------------+-----------------------+
|housingMedianAge|sum(Median_House_Value)|
+----------------+-----------------------+
|             8.0|            4.0049404E7|
|             7.0|            3.3826806E7|
|            49.0|            2.9525008E7|
+----------------+-----------------------+
only showing top 3 rows

