# Pyspark

## Dataset

In [1]:
from sklearn.datasets import fetch_california_housing

california_housing = fetch_california_housing(as_frame=True)
california_housing_csv = california_housing["data"]
california_housing_csv.to_csv("data/housing.csv", index = False)

## Pandas Version

In [2]:
import pandas as pd 
data_pandas = pd.read_csv("data/housing.csv")
data_pandas

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude
0,8.3252,41.0,6.984127,1.023810,322.0,2.555556,37.88,-122.23
1,8.3014,21.0,6.238137,0.971880,2401.0,2.109842,37.86,-122.22
2,7.2574,52.0,8.288136,1.073446,496.0,2.802260,37.85,-122.24
3,5.6431,52.0,5.817352,1.073059,558.0,2.547945,37.85,-122.25
4,3.8462,52.0,6.281853,1.081081,565.0,2.181467,37.85,-122.25
...,...,...,...,...,...,...,...,...
20635,1.5603,25.0,5.045455,1.133333,845.0,2.560606,39.48,-121.09
20636,2.5568,18.0,6.114035,1.315789,356.0,3.122807,39.49,-121.21
20637,1.7000,17.0,5.205543,1.120092,1007.0,2.325635,39.43,-121.22
20638,1.8672,18.0,5.329513,1.171920,741.0,2.123209,39.43,-121.32


### Aggregate Function

In [13]:
data_pandas.groupby(["HouseAge"]).sum().head(20)

Unnamed: 0_level_0,MedInc,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude
HouseAge,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1.0,16.0136,36.352363,6.208422,1314.0,12.97635,144.82,-477.83
2.0,299.7304,443.64706,73.863799,120817.0,159.372502,2058.68,-6909.04
3.0,338.536,424.811934,73.934135,186110.0,169.551917,2187.64,-7374.06
4.0,989.5086,1167.508603,213.192367,537314.0,532.15235,6678.69,-22693.53
5.0,1146.2232,1455.023412,275.235401,603017.0,800.102313,8597.16,-29016.0
6.0,701.3428,933.159425,176.93429,346502.0,461.785257,5637.3,-19039.09
7.0,781.2799,1091.400811,200.395719,457289.0,513.965892,6220.25,-20844.51
8.0,918.6237,1293.779173,242.07051,415781.0,692.312897,7363.23,-24579.97
9.0,884.8672,1232.894073,234.632055,386830.0,594.419328,7315.09,-24457.61
10.0,1069.3342,1596.568521,315.614596,452212.0,753.122942,9481.46,-31521.73


## Spark Version 

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Agg').getOrCreate()

In [9]:
data_spark = spark.read.csv("data/housing.csv", header=True, inferSchema=True)
data_spark.show()

+------+--------+------------------+------------------+----------+------------------+--------+---------+
|MedInc|HouseAge|          AveRooms|         AveBedrms|Population|          AveOccup|Latitude|Longitude|
+------+--------+------------------+------------------+----------+------------------+--------+---------+
|8.3252|    41.0| 6.984126984126984|1.0238095238095237|     322.0|2.5555555555555554|   37.88|  -122.23|
|8.3014|    21.0| 6.238137082601054|0.9718804920913884|    2401.0| 2.109841827768014|   37.86|  -122.22|
|7.2574|    52.0| 8.288135593220339| 1.073446327683616|     496.0|2.8022598870056497|   37.85|  -122.24|
|5.6431|    52.0|5.8173515981735155|1.0730593607305936|     558.0| 2.547945205479452|   37.85|  -122.25|
|3.8462|    52.0| 6.281853281853282|1.0810810810810811|     565.0|2.1814671814671813|   37.85|  -122.25|
|4.0368|    52.0| 4.761658031088083|1.1036269430051813|     413.0| 2.139896373056995|   37.85|  -122.25|
|3.6591|    52.0|4.9319066147859925|0.9513618677042801|

### Aggregate Function

In [14]:
grouped = data_spark.groupBy('HouseAge').sum()
grouped.show()

+--------+------------------+-------------+------------------+------------------+---------------+------------------+------------------+-------------------+
|HouseAge|       sum(MedInc)|sum(HouseAge)|     sum(AveRooms)|    sum(AveBedrms)|sum(Population)|     sum(AveOccup)|     sum(Latitude)|     sum(Longitude)|
+--------+------------------+-------------+------------------+------------------+---------------+------------------+------------------+-------------------+
|     8.0| 918.6236999999999|       1648.0|1293.7791726272808|242.07050979855802|       415781.0| 692.3128971076675|           7363.23|-24579.969999999972|
|     7.0| 781.2799000000001|       1225.0|1091.4008105186736|200.39571939232053|       457289.0| 513.9658917400823|           6220.25|-20844.510000000006|
|    49.0| 475.7662999999998|       6566.0| 665.5445193661707|139.76969351787423|       132552.0| 364.6753501035802|4826.0199999999995|-16116.939999999995|
|    29.0|1650.5204000000012|      13369.0|  2404.31928230994|50

In [16]:
spark.stop()