<a href="https://colab.research.google.com/github/Manu-Singh22/Data_Generator_repo/blob/Spark/SparkML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:

pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 62 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 73.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=6ef114a35ede24e388384e34d9d826635bb0cd930c1b79eae5a31353d6660fce
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
# Import relevant lib

from pyspark.sql import SparkSession



In [3]:
# Creating a Spark Session

spark=SparkSession.builder\
.master('local[4]')\
.appName('SparkML')\
.getOrCreate()

In [20]:
# loading the Housing data

data=spark.read.format('csv').load('housing.csv', header=True,inferSchema=True)

In [21]:
# Printing the schema of data 

data.printSchema()

# we can see that only ocean_proximity is categorical feature all others a double type

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [22]:
# Take a glance of the data

data.show(10)

# we can see that there is no index in it

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [23]:
# Adding Index to the data

from pyspark.sql.functions import monotonically_increasing_id

data=data.withColumn('Id',monotonically_increasing_id())

# Arranging the columns

data=data[['id']+data.columns[:-1]]

In [24]:
#  Take a look after adding the index

data.show(10)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  4| 

In [25]:
# Lets see the count of records 

data.count()

20640

In [26]:
# Lets have some fun with data

# Aggregation of two columns using agg function 

data.select(['total_rooms','housing_median_age']).agg({'total_rooms':'avg','housing_median_age':'avg'}).show()

+-----------------------+------------------+
|avg(housing_median_age)|  avg(total_rooms)|
+-----------------------+------------------+
|     28.639486434108527|2635.7630813953488|
+-----------------------+------------------+



In [27]:
# Average of all columns using pyspark mean function

from pyspark.sql.functions import mean

data.select(*[mean(c) for c in data.columns]).show()

# One thing to notice here is that ocean_proximity avg is null

+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|avg(id)|     avg(longitude)|   avg(latitude)|avg(housing_median_age)|  avg(total_rooms)|avg(total_bedrooms)|   avg(population)|  avg(households)|avg(median_income)|avg(median_house_value)|avg(ocean_proximity)|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|10319.5|-119.56970445736148|35.6318614341087|     28.639486434108527|2635.7630813953488|  537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|     206855.81690891474|                null|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+----------

In [28]:
# Trying Out Groupby function on ocean Poximity

data.groupby('ocean_proximity').agg({col:'avg' for col in data.columns[3:-2]}).show()

+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+
|ocean_proximity|   avg(households)|   avg(population)|avg(total_bedrooms)|avg(median_income)|  avg(total_rooms)|avg(housing_median_age)|
+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+
|         ISLAND|             276.6|             668.0|              420.4|2.7444200000000003|            1574.6|                   42.4|
|     NEAR OCEAN|501.24454477050415|1354.0086531226486|  538.6156773211568| 4.005784800601957| 2583.700902934537|     29.347253574115875|
|       NEAR BAY| 488.6161572052402|1230.3174672489083|  514.1828193832599| 4.172884759825336| 2493.589519650655|      37.73013100436681|
|      <1H OCEAN| 517.7449649737302|1520.2904991243433|  546.5391852999778|4.2306819176882655|2628.3435858143607|     29.279225043782837|
|         INLAND|477.4475652572126

In [29]:
# Trying out simple UDF

# import the return type

from pyspark.sql.types import FloatType

# import udf class

from pyspark.sql.functions import udf

# define UDF

def cube(value):
  return value*value*value

# Wrap the UDF inside the udf class

cube=udf(cube,FloatType())

data.withColumn('cube_rooms',cube('total_rooms')).show()

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|   cube_rooms|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    6.81472E8|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|3.57759779E11|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY| 3.15711462E9|
|  3|  -122.25|   37.85|              52.0|     1274.0|         

In [31]:
# Working with MLlib

# Glance of data

data.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  4| 