<a href="https://colab.research.google.com/github/adrianmarino19/pyspark/blob/master/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('practice').getOrCreate()

In [None]:
# Test your setup

df = spark.range(1000)
df.count()  # Should return 1000

1000

In [None]:
df = spark.read.csv('sample_data/california_housing_test.csv', header=True, inferSchema=True)

In [None]:
df.show(50)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
|  -118.36|   33.82|              28.0|       67.0|          15.0|      49.0|      11.0|       6.1359|          330000.0|
|  -119.67|   36.33|              19.0|     1241.0|         244.0|     850.0|     237.0|       2.9375|           81700.0|
|  -119.56|   36.51|    

In [None]:
# Display the schema and verify data types

df.printSchema()

# Float is double.
# Nullable implies that column can have null values.

# We can also run this df.dtypes only for the data types. Not super useful.

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [None]:
# Count total number of rows.

df.count()

3000

In [None]:
# Show basic statistics using describe()

df.describe().show()

+-------+-------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|summary|          longitude|          latitude|housing_median_age|      total_rooms|    total_bedrooms|        population|        households|     median_income|median_house_value|
+-------+-------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|  count|               3000|              3000|              3000|             3000|              3000|              3000|              3000|              3000|              3000|
|   mean|-119.58920000000029| 35.63538999999999|28.845333333333333|2599.578666666667| 529.9506666666666|1402.7986666666666|           489.912| 3.807271799999998|        205846.275|
| stddev| 1.9949362939550166|2.1296695233438334|12.555395554955757|2155.593331625582|415.654368

In [None]:
# Check for missing values

# Unfortunately you need to import specific functions.

from pyspark.sql.functions import col, when, count, isnan, isnull

# it's so fucking stupid. Checking nulls values per column...

df.select([
    count(when(isnull(c) | isnan(c), c).alias(c)) for c in df.columns
]).show()

+------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------+
|count(CASE WHEN ((longitude IS NULL) OR isnan(longitude)) 

In [None]:
# How do you calculate the average median_income?

df.agg({'median_income': 'avg'}).show()

+------------------+
|avg(median_income)|
+------------------+
| 3.807271799999998|
+------------------+



In [None]:
# 	How do you get the column names?

df.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

# Data types & nulls

In [None]:
# How do you check the data type of each column in the DataFrame?

df.printSchema()
df.dtypes

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



[('longitude', 'double'),
 ('latitude', 'double'),
 ('housing_median_age', 'double'),
 ('total_rooms', 'double'),
 ('total_bedrooms', 'double'),
 ('population', 'double'),
 ('households', 'double'),
 ('median_income', 'double'),
 ('median_house_value', 'double')]

In [None]:
# How do you check if there are any missing values in the entire DataFrame?

from pyspark.sql.functions import count, when, isnan, isnull

# ???


In [None]:
# How would you count how many rows have a null value in the total_bedrooms column?

df.filter(df.total_bedrooms.isNull()).count()

0

## Filter

In [None]:
# Let's train on fitler techniques.

# 🔹 1. Filter rows where median_income is greater than 4

df.filter('median_income > 4').show(10) # so in PySpark you can pass your condition as a string too... Kind of weird. This is given that it has an SQL engine under it.

df.filter(df.median_income > 4).show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
|  -118.36|   33.82|              28.0|       67.0|          15.0|      49.0|      11.0|       6.1359|          330000.0|
|  -121.93|   37.25|              36.0|     1089.0|         182.0|     535.0|     170.0|         4.69|          252600.0|
|  -117.03|   32.97|              16.0|     3936.0|         694.0|    1935.0|     659.0|       4.5625|          231200.0|
|  -117.97|   33.73|    

In [None]:
# 🔹 2. Filter rows where housing_median_age is exactly 52


df.filter(df.housing_median_age == 52).show(5)

print(f'{df.filter(df.housing_median_age == 52).count()} total rows.')

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.42|   37.76|              52.0|     3587.0|        1030.0|    2259.0|     979.0|       2.5403|          250000.0|
|  -122.53|   37.97|              52.0|     1560.0|         451.0|     700.0|     419.0|       2.5125|          270800.0|
|  -121.92|   37.33|              52.0|     2125.0|         382.0|     930.0|     387.0|       5.2831|          299500.0|
|   -118.2|   33.77|              52.0|     1375.0|         457.0|    1089.0|     317.0|       2.2344|          200000.0|
|  -122.43|   37.74|              52.0|     1514.0|         314.0|     724.0|     301.0|       5.3292|          300900.0|
+---------+--------+----

In [None]:
# 🔹 3. Filter rows where median_house_value is less than 100000

df.filter(df.median_house_value < 10000000).show(10)
print(f'{df.filter(df.median_house_value < 1000000).count()} total rows.')

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
|  -118.36|   33.82|              28.0|       67.0|          15.0|      49.0|      11.0|       6.1359|          330000.0|
|  -119.67|   36.33|              19.0|     1241.0|         244.0|     850.0|     237.0|       2.9375|           81700.0|
|  -119.56|   36.51|    

In [None]:
# 🔹 4. Filter rows where population is over 1000 and median_income is under 2

df.filter((df.population > 1000) & (df.median_income < 2)).show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -118.45|   34.07|              19.0|     4845.0|        1609.0|    3751.0|    1539.0|        1.583|          350000.0|
|   -118.2|   34.04|              44.0|     1582.0|         544.0|    1998.0|     515.0|       1.6888|          125000.0|
|  -118.28|   34.05|              44.0|      968.0|         384.0|    1805.0|     375.0|       1.4801|          212500.0|
|   -118.3|   34.02|              49.0|     2120.0|         483.0|    1522.0|     416.0|         1.85|          116800.0|
|  -118.26|   33.99|              36.0|     2016.0|         505.0|    1807.0|     464.0|       1.6901|          103500.0|
+---------+--------+----

In [None]:
# 🔹 5. Filter rows where total_rooms is null

df.filter(df.total_rooms.isNull()).count()

0

In [None]:
df.filter((df.latitude == 33) | (df.latitude == 34)).show()

f'{df.filter((df.latitude == 33) | (df.latitude == 34)).count()} rows my friends!'

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -118.11|    34.0|              33.0|     2886.0|         726.0|    2650.0|     728.0|        2.625|          178700.0|
|  -118.41|    34.0|              35.0|     1062.0|         305.0|    1026.0|     307.0|       2.7153|          265500.0|
|  -118.41|    34.0|              38.0|      324.0|          70.0|     268.0|      73.0|         2.55|          271400.0|
|  -118.28|    34.0|              42.0|      855.0|         284.0|     890.0|     247.0|       1.2778|          112500.0|
|  -118.08|    34.0|              32.0|     1165.0|         358.0|     997.0|     361.0|       0.9817|          166300.0|
|   -118.4|    34.0|    

'24 rows my friends!'

In [None]:
#🔹 9. Filter rows where total_bedrooms is greater than total_rooms (just for fun)

f'{df.filter(df.total_bedrooms > df.total_rooms).count()} obvioooo'

'0 obvioooo'

# Basic Descriptive Statistics

## Select

In [None]:
## Select

df.select(df.longitude, df.latitude).show(10)

+---------+--------+
|longitude|latitude|
+---------+--------+
|  -122.05|   37.37|
|   -118.3|   34.26|
|  -117.81|   33.78|
|  -118.36|   33.82|
|  -119.67|   36.33|
|  -119.56|   36.51|
|  -121.43|   38.63|
|  -120.65|   35.48|
|  -122.84|    38.4|
|  -118.02|   34.08|
+---------+--------+
only showing top 10 rows



In [None]:
df.select(df.median_income.alias('income')).show()

+------+
|income|
+------+
|6.6085|
| 3.599|
|5.7934|
|6.1359|
|2.9375|
|1.6635|
|1.6641|
| 3.225|
|3.6696|
|2.3333|
|2.2054|
|2.4167|
|  4.69|
|4.5625|
|5.7121|
|   2.2|
| 1.875|
|2.7174|
|6.5851|
|6.1724|
+------+
only showing top 20 rows



In [None]:
df.select((df.median_house_value/1000).alias('house_value_in_thousands')).show()

+------------------------+
|house_value_in_thousands|
+------------------------+
|                   344.7|
|                   176.5|
|                   270.5|
|                   330.0|
|                    81.7|
|                    67.0|
|                    67.0|
|                   166.9|
|                   194.4|
|                   164.2|
|                   125.0|
|                    58.3|
|                   252.6|
|                   231.2|
|                   222.5|
|                   153.1|
|                   181.3|
|                   137.5|
|                   300.0|
|                   414.3|
+------------------------+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import round

df.select((round(df.total_rooms/df.households,1)).alias('rooms_per_household'), (round(df.total_bedrooms/df.total_rooms, 1)).alias('bedrooms_per_room')).show()

+-------------------+-----------------+
|rooms_per_household|bedrooms_per_room|
+-------------------+-----------------+
|                6.4|              0.2|
|                5.5|              0.2|
|                7.3|              0.1|
|                6.1|              0.2|
|                5.2|              0.2|
|                5.0|              0.2|
|                4.6|              0.2|
|                5.2|              0.2|
|                5.1|              0.2|
|                4.0|              0.3|
|                3.7|              0.3|
|                5.3|              0.2|
|                6.4|              0.2|
|                6.0|              0.2|
|                6.3|              0.2|
|                3.2|              0.2|
|                5.3|              0.2|
|                5.2|              0.2|
|                8.0|              0.1|
|                7.0|              0.1|
+-------------------+-----------------+
only showing top 20 rows



In [None]:
df.select((df.population/100).alias('popu with 2 zeros less')).show()

+----------------------+
|popu with 2 zeros less|
+----------------------+
|                 15.37|
|                  8.09|
|                 14.84|
|                  0.49|
|                   8.5|
|                  6.63|
|                  6.04|
|                 13.41|
|                 14.46|
|                  28.3|
|                 12.88|
|                  5.64|
|                  5.35|
|                 19.35|
|                 12.17|
|                  1.57|
|                  1.89|
|                 16.03|
|                  6.54|
|                  34.5|
+----------------------+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import avg,max

df.select(avg(df.median_income), max(df.median_house_value)).show()

+------------------+-----------------------+
|avg(median_income)|max(median_house_value)|
+------------------+-----------------------+
| 3.807271799999998|               500001.0|
+------------------+-----------------------+



In [None]:
df.filter(df.median_income > 5).select(df.median_income, df.population).show()

+-------------+----------+
|median_income|population|
+-------------+----------+
|       6.6085|    1537.0|
|       5.7934|    1484.0|
|       6.1359|      49.0|
|       5.7121|    1217.0|
|       6.5851|     654.0|
|       6.1724|    3450.0|
|       7.1615|     667.0|
|        6.603|    1030.0|
|       5.5238|     956.0|
|       6.6783|    1269.0|
|      15.0001|     697.0|
|       5.0808|     799.0|
|       5.6385|    1825.0|
|       5.8221|     445.0|
|        5.898|    1055.0|
|       6.1273|    1204.0|
|       6.1561|     728.0|
|       5.2831|     930.0|
|       6.8075|    2703.0|
|        7.494|    2468.0|
+-------------+----------+
only showing top 20 rows



In [None]:
df.select((df.median_house_value / df.population).alias('value_per_person')).show()

+------------------+
|  value_per_person|
+------------------+
|224.26805465191933|
| 218.1705809641533|
|  182.277628032345|
| 6734.693877551021|
| 96.11764705882354|
|101.05580693815988|
| 110.9271523178808|
|124.45935868754661|
|134.43983402489627|
| 58.02120141342756|
| 97.04968944099379|
|103.36879432624113|
|472.14953271028037|
|119.48320413436693|
| 182.8266228430567|
| 975.1592356687898|
| 959.2592592592592|
| 85.77666874610107|
| 458.7155963302752|
|120.08695652173913|
+------------------+
only showing top 20 rows



## col() and loops

In [79]:
from pyspark.sql.functions import col

# for c in df.columns: --> SAME AS
#   print(df[c])

for c in df.columns:
  print(col(c)) # Same exact thing.


Column<'longitude'>
Column<'latitude'>
Column<'housing_median_age'>
Column<'total_rooms'>
Column<'total_bedrooms'>
Column<'population'>
Column<'households'>
Column<'median_income'>
Column<'median_house_value'>


In [71]:
for c in df.columns:
  print(c)

longitude
latitude
housing_median_age
total_rooms
total_bedrooms
population
households
median_income
median_house_value


In [87]:
# first columns using loop

df.select([df[c] for c in df.columns[:3]]).show(5)

+---------+--------+------------------+
|longitude|latitude|housing_median_age|
+---------+--------+------------------+
|  -122.05|   37.37|              27.0|
|   -118.3|   34.26|              43.0|
|  -117.81|   33.78|              27.0|
|  -118.36|   33.82|              28.0|
|  -119.67|   36.33|              19.0|
+---------+--------+------------------+
only showing top 5 rows



In [92]:
from pyspark.sql.functions import upper

df.select([col(c).alias(c.upper()) for c in df.columns]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|LONGITUDE|LATITUDE|HOUSING_MEDIAN_AGE|TOTAL_ROOMS|TOTAL_BEDROOMS|POPULATION|HOUSEHOLDS|MEDIAN_INCOME|MEDIAN_HOUSE_VALUE|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
|  -118.36|   33.82|              28.0|       67.0|          15.0|      49.0|      11.0|       6.1359|          330000.0|
|  -119.67|   36.33|              19.0|     1241.0|         244.0|     850.0|     237.0|       2.9375|           81700.0|
|  -119.56|   36.51|    

In [96]:
df.select([
    df[c] if c == 'longitude' | c == 'latitude' else avg(df[c]) for c in df.columns
])

TypeError: unsupported operand type(s) for |: 'str' and 'str'

## If else

In [99]:
# # This is wrong, no need to use select... which creates a new column.

# df.select([
#     'YES' if c == 'latitude' else 'NO'for c in df.columns
# ]).show()

In [None]:
df.select([df[c] if c == 'longitude' | c == 'latitude' else 'SKIP' for c in df.columns]).show()

In [None]:
[round(df[c], 1) if c == 'median_income' else df[c] for c in df.columns]

# I want to show it!
df.select([round(df[c], 1) if c == 'median_income' else df[c] for c in df.columns]).show()

In [108]:
['KEEP' if c in ['latitude', 'longitude'] else 'SKIP' for c in df.columns]

['KEEP', 'KEEP', 'SKIP', 'SKIP', 'SKIP', 'SKIP', 'SKIP', 'SKIP', 'SKIP']

In [113]:
# There are two types of list comprehensions.
# 1. Obligatory transformation: if else before for loop to do something which every single value.
# 2. Filtering. If afterwards, because you want to decide what to include.

# Question: From cols, build a list with the original name if it starts with "m" (hint: c.startswith('m')), else the uppercase name.

[c for c in df.columns if c.startswith('m')]

['median_income', 'median_house_value']

In [116]:
[df[c] for c in df.columns if c.__contains__('house')]

## The better way was as such:
# [df[c] for c in df.columns if 'house' in c]

[Column<'households'>, Column<'median_house_value'>]

In [117]:
df.select([
    col(c) for c in df.columns if c != 'total_bedrooms' or c != 'population'
]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
|  -118.36|   33.82|              28.0|       67.0|          15.0|      49.0|      11.0|       6.1359|          330000.0|
|  -119.67|   36.33|              19.0|     1241.0|         244.0|     850.0|     237.0|       2.9375|           81700.0|
|  -119.56|   36.51|    

## groupBy

In [None]:
# FYI
# aggregations are sum, avg, max, etc.
# Why? Because they aggreagate data, the group multiple rows.

In [120]:
df.select(avg(df.median_income)).show()
df.agg(avg(df.median_income)).show() # same! agg is used right after a groupBy.

+------------------+
|avg(median_income)|
+------------------+
| 3.807271799999998|
+------------------+

+------------------+
|avg(median_income)|
+------------------+
| 3.807271799999998|
+------------------+

