<a href="https://colab.research.google.com/github/ChitraChaudhari/GC_DataEngineering_Bootcamp/blob/main/IRS_EDA_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install Java, Spark, and Findspark
This installs Apache Spark, Java 8, and [Findspark](https://github.com/minrk/findspark), a library that makes it easy for Python to find Spark.

In [None]:
%%bash
sudo apt-get update
apt-get install openjdk-8-jdk-headless -qq > /dev/null
[ ! -e "$(basename spark-3.1.3-bin-hadoop2.7.tgz)" ] && wget  http://apache.osuosl.org/spark/spark-3.1.3/spark-3.1.3-bin-hadoop2.7.tgz  
tar xf spark-3.1.3-bin-hadoop2.7.tgz
pip install -q findspark

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease [1,581 B]
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:6 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Packages [902 kB]
Get:8 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:11 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:12 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:13 http://archive.ubuntu.com/ubuntu bionic-updates/main amd64 Packages [3

--2022-08-06 00:46:07--  http://apache.osuosl.org/spark/spark-3.1.3/spark-3.1.3-bin-hadoop2.7.tgz
Resolving apache.osuosl.org (apache.osuosl.org)... 64.50.233.100, 64.50.236.52, 140.211.166.134, ...
Connecting to apache.osuosl.org (apache.osuosl.org)|64.50.233.100|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 227452039 (217M) [application/x-gzip]
Saving to: ‘spark-3.1.3-bin-hadoop2.7.tgz’

     0K .......... .......... .......... .......... ..........  0%  371K 9m58s
    50K .......... .......... .......... .......... ..........  0%  742K 7m29s
   100K .......... .......... .......... .......... ..........  0%  739K 6m39s
   150K .......... .......... .......... .......... ..........  0%  741K 6m14s
   200K .......... .......... .......... .......... ..........  0%  181M 4m59s
   250K .......... .......... .......... .......... ..........  0%  210M 4m10s
   300K .......... .......... .......... .......... ..........  0%  743K 4m17s
   350K .......... ........

# Set Environment Variables
Set the locations where Spark and Java are installed.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.3-bin-hadoop2.7"

# Start a SparkSession
This will start a local Spark session. 


In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

# get a spark session. 
spark = SparkSession.builder.master("local[*]").getOrCreate()
type(spark)

pyspark.sql.session.SparkSession

In [None]:
spark

# Create Dataframe in Spark!

## Dataset
https://storage.googleapis.com/files.mobibootcamp.com/2022-datasets/IRSIncomeByZipCode.csv

##Reference
https://docs.databricks.com/getting-started/spark/dataframes.html#load-sample-data

https://docs.databricks.com/data/data-sources/read-csv.html




In [None]:
! [ ! -e "$(basename IRSIncomeByZipCode.csv)" ] && wget  https://storage.googleapis.com/files.mobibootcamp.com/2022-datasets/IRSIncomeByZipCode.csv
df = spark.read.csv('IRSIncomeByZipCode.csv',
                      header = True, 
                      inferSchema = True)

print(type(df))

--2022-08-06 00:47:03--  https://storage.googleapis.com/files.mobibootcamp.com/2022-datasets/IRSIncomeByZipCode.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 142.251.2.128, 2607:f8b0:4023:c0d::80
Connecting to storage.googleapis.com (storage.googleapis.com)|142.251.2.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2293095 (2.2M) [text/csv]
Saving to: ‘IRSIncomeByZipCode.csv’


2022-08-06 00:47:03 (190 MB/s) - ‘IRSIncomeByZipCode.csv’ saved [2293095/2293095]

<class 'pyspark.sql.dataframe.DataFrame'>


In [None]:
print(df.columns)

['index', 'STATE', 'ZIPCODE', 'Number of returns', 'Adjusted gross income (AGI)', 'Avg AGI', 'Number of returns with total income', 'Total income amount', 'Avg total income', 'Number of returns with taxable income', 'Taxable income amount', 'Avg taxable income']


In [None]:
df.printSchema()

root
 |-- index: integer (nullable = true)
 |-- STATE: string (nullable = true)
 |-- ZIPCODE: integer (nullable = true)
 |-- Number of returns: integer (nullable = true)
 |-- Adjusted gross income (AGI): integer (nullable = true)
 |-- Avg AGI: double (nullable = true)
 |-- Number of returns with total income: integer (nullable = true)
 |-- Total income amount: integer (nullable = true)
 |-- Avg total income: double (nullable = true)
 |-- Number of returns with taxable income: integer (nullable = true)
 |-- Taxable income amount: integer (nullable = true)
 |-- Avg taxable income: double (nullable = true)



In [None]:
df.show()

+-----+-----+-------+-----------------+---------------------------+-----------+-----------------------------------+-------------------+----------------+-------------------------------------+---------------------+------------------+
|index|STATE|ZIPCODE|Number of returns|Adjusted gross income (AGI)|    Avg AGI|Number of returns with total income|Total income amount|Avg total income|Number of returns with taxable income|Taxable income amount|Avg taxable income|
+-----+-----+-------+-----------------+---------------------------+-----------+-----------------------------------+-------------------+----------------+-------------------------------------+---------------------+------------------+
|    0|   AL|      0|          2022380|                  105089761|51.96340994|                            2022380|          106420533|     52.62143267|                              1468370|             67850874|       46.20829491|
|    1|   AL|  35004|             4930|                     255534|51.83

## Cleaning step - 

###Change column names


In [None]:
df = df.toDF(*[value.lower().replace(' ', '_') for value in df.columns])

In [None]:
df.columns

['index',
 'state',
 'zipcode',
 'number_of_returns',
 'adjusted_gross_income_(agi)',
 'avg_agi',
 'number_of_returns_with_total_income',
 'total_income_amount',
 'avg_total_income',
 'number_of_returns_with_taxable_income',
 'taxable_income_amount',
 'avg_taxable_income']

In [None]:
df = df.withColumnRenamed('adjusted_gross_income_(agi)','agi')

In [None]:
df.printSchema()

root
 |-- index: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- number_of_returns: integer (nullable = true)
 |-- agi: integer (nullable = true)
 |-- avg_agi: double (nullable = true)
 |-- number_of_returns_with_total_income: integer (nullable = true)
 |-- total_income_amount: integer (nullable = true)
 |-- avg_total_income: double (nullable = true)
 |-- number_of_returns_with_taxable_income: integer (nullable = true)
 |-- taxable_income_amount: integer (nullable = true)
 |-- avg_taxable_income: double (nullable = true)



In [None]:
df = df.withColumnRenamed("number_of_returns",'returns').withColumnRenamed("number_of_returns_with_total_income","returns_total_income").withColumnRenamed("number_of_returns_with_taxable_income","returns_taxable_income")

In [None]:
#from pyspark.sql.functions import *

#df = df.select(col("number_of_returns").alias("returns"),
#  col("number_of_returns_with_total_income").alias("returns_total_income"),
#  col("number_of_returns_with_taxable_income").alias("returns_taxable_income"))


In [None]:
df.printSchema()

root
 |-- index: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- returns: integer (nullable = true)
 |-- agi: integer (nullable = true)
 |-- avg_agi: double (nullable = true)
 |-- returns_total_income: integer (nullable = true)
 |-- total_income_amount: integer (nullable = true)
 |-- avg_total_income: double (nullable = true)
 |-- returns_taxable_income: integer (nullable = true)
 |-- taxable_income_amount: integer (nullable = true)
 |-- avg_taxable_income: double (nullable = true)



In [None]:
df.show(5)

+-----+-----+-------+-------+---------+-----------+--------------------+-------------------+----------------+----------------------+---------------------+------------------+
|index|state|zipcode|returns|      agi|    avg_agi|returns_total_income|total_income_amount|avg_total_income|returns_taxable_income|taxable_income_amount|avg_taxable_income|
+-----+-----+-------+-------+---------+-----------+--------------------+-------------------+----------------+----------------------+---------------------+------------------+
|    0|   AL|      0|2022380|105089761|51.96340994|             2022380|          106420533|     52.62143267|               1468370|             67850874|       46.20829491|
|    1|   AL|  35004|   4930|   255534|51.83245436|                4930|             258024|     52.33752535|                  4020|               163859|       40.76094527|
|    2|   AL|  35005|   3300|   128387|38.90515152|                3300|             129390|     39.20909091|                  244

# Statistics on columns

In [None]:
# Run statistics on price column
result = df.select(['zipcode','returns','agi','returns_total_income','total_income_amount','returns_taxable_income','taxable_income_amount']).describe()
result.show()

+-------+------------------+------------------+--------------------+--------------------+--------------------+----------------------+---------------------+
|summary|           zipcode|           returns|                 agi|returns_total_income| total_income_amount|returns_taxable_income|taxable_income_amount|
+-------+------------------+------------------+--------------------+--------------------+--------------------+----------------------+---------------------+
|  count|             27790|             27790|               27790|               27790|               27790|                 27790|                27790|
|   mean|48853.603274559195|10346.201151493344|   657743.3184598776|  10346.191795609931|   667443.1108312343|     7882.510975170925|    451150.8901763224|
| stddev|27140.437868919653| 180653.3663797637|1.2085373416129865E7|  180653.18933603962|1.2262993235964034E7|     136768.3061582784|    8310008.520543394|
|    min|                 0|                90|                1

## More Analysis

In [None]:
print(" The IRS dataframe has {} records".format(df.count()))

 The IRS dataframe has 27790 records


In [None]:
df1 = df.drop('index','zipcode','avg_agi','avg_total_income','avg_taxable_income')
df1.printSchema()

root
 |-- state: string (nullable = true)
 |-- returns: integer (nullable = true)
 |-- agi: integer (nullable = true)
 |-- returns_total_income: integer (nullable = true)
 |-- total_income_amount: integer (nullable = true)
 |-- returns_taxable_income: integer (nullable = true)
 |-- taxable_income_amount: integer (nullable = true)



In [None]:
result = df1.groupBy('state').sum()
result.show()

+-----+------------+----------+-------------------------+------------------------+---------------------------+--------------------------+
|state|sum(returns)|  sum(agi)|sum(returns_total_income)|sum(total_income_amount)|sum(returns_taxable_income)|sum(taxable_income_amount)|
+-----+------------+----------+-------------------------+------------------------+---------------------------+--------------------------+
|   AZ|     5530920| 316364406|                  5530910|               320757412|                    4172440|                 209631328|
|   SC|     4156340| 215111572|                  4156360|               218070778|                    3058070|                 137769156|
|   LA|     3959820| 222866984|                  3959840|               226109894|                    2909800|                 151376140|
|   MN|     5245530| 348804760|                  5245540|               355257206|                    4221490|                 242347210|
|   NJ|     8540010| 678581630|   

In [None]:
result.describe().show()

+-------+-----+-----------------+--------------------+-------------------------+------------------------+---------------------------+--------------------------+
|summary|state|     sum(returns)|            sum(agi)|sum(returns_total_income)|sum(total_income_amount)|sum(returns_taxable_income)|sum(taxable_income_amount)|
+-------+-----+-----------------+--------------------+-------------------------+------------------------+---------------------------+--------------------------+
|  count|   51|               51|                  51|                       51|                      51|                         51|                        51|
|   mean| null|5637665.294117647|3.5840562392156863E8|        5637660.196078432|     3.636910598039216E8|           4295195.68627451|      2.4583300466666666E8|
| stddev| null|6340579.692344021| 4.399834661626695E8|        6340575.484253614|     4.464334416513961E8|          4776368.896045302|       3.029390305104617E8|
|    min|   AK|           557720| 

In [None]:
from pyspark.sql import functions
#df.orderBy(['Salary'], ascending = [True]).show()
result.orderBy(['sum(agi)'], ascending = [False]).show()

+-----+------------+----------+-------------------------+------------------------+---------------------------+--------------------------+
|state|sum(returns)|  sum(agi)|sum(returns_total_income)|sum(total_income_amount)|sum(returns_taxable_income)|sum(taxable_income_amount)|
+-----+------------+----------+-------------------------+------------------------+---------------------------+--------------------------+
|   CA|    33726920|2418380792|                 33726890|              2456689822|                   25669180|                1624946162|
|   TX|    23465220|1496710822|                 23465190|              1515344430|                   17426140|                1057913480|
|   NY|    18624470|1427198432|                 18624450|              1447671582|                   14050890|                 997471116|
|   FL|    18263140|1081470238|                 18263140|              1096201886|                   13340580|                 753079248|
|   IL|    12047780| 811144188|   

In [None]:
result.orderBy(['sum(agi)'], ascending = [True]).show()

+-----+------------+---------+-------------------------+------------------------+---------------------------+--------------------------+
|state|sum(returns)| sum(agi)|sum(returns_total_income)|sum(total_income_amount)|sum(returns_taxable_income)|sum(taxable_income_amount)|
+-----+------------+---------+-------------------------+------------------------+---------------------------+--------------------------+
|   VT|      631370| 35173134|                   631370|                35912096|                     497030|                  23794848|
|   WY|      557720| 38997814|                   557720|                39577282|                     446580|                  28652500|
|   AK|      710340| 46067816|                   710340|                46763164|                     596480|                  33978970|
|   SD|      810000| 46405460|                   809990|                47501124|                     637660|                  32830756|
|   ND|      714670| 49796360|           

In [None]:
result.printSchema()

root
 |-- state: string (nullable = true)
 |-- sum(returns): long (nullable = true)
 |-- sum(agi): long (nullable = true)
 |-- sum(returns_total_income): long (nullable = true)
 |-- sum(total_income_amount): long (nullable = true)
 |-- sum(returns_taxable_income): long (nullable = true)
 |-- sum(taxable_income_amount): long (nullable = true)



In [None]:
type(result)

pyspark.sql.dataframe.DataFrame

## Top 10 richest states



In [None]:
#df.select("firstname","salary", lit(df.salary * 0.3).alias("bonus_amount")).show()
# import lit function
from pyspark.sql.functions import lit
result.select("state","sum(returns)" , lit(result["sum(agi)"]/result["sum(returns)"]).alias("avg_agi")).sort(["avg_agi"],ascending = [False]).show(10)

+-----+------------+-----------------+
|state|sum(returns)|          avg_agi|
+-----+------------+-----------------+
|   CT|     3453600|90.07026696780171|
|   DC|      653350|84.20220096426111|
|   MA|     6520440| 81.2116393985682|
|   NJ|     8540010|79.45911421649389|
|   NY|    18624470|76.63028435171579|
|   CA|    33726920|71.70476260506445|
|   MD|     5820970|70.52357012662839|
|   VA|     7584510|70.26709859964586|
|   WY|      557720|69.92364268808721|
|   ND|      714670|69.67741754935845|
+-----+------------+-----------------+
only showing top 10 rows



## 10 poorest states

In [None]:
result.select("state","sum(returns)" , lit(result["sum(agi)"]/result["sum(returns)"]).alias("avg_agi")).sort(["avg_agi"],ascending = [True]).show(10)

+-----+------------+------------------+
|state|sum(returns)|           avg_agi|
+-----+------------+------------------+
|   MS|     2458630| 46.27042539951111|
|   WV|     1552380| 48.74156843041008|
|   NM|     1779870| 50.20755111328356|
|   KY|     3720070| 50.73022819463075|
|   AR|     2407010| 51.61637882684326|
|   SC|     4156340|51.755046988456186|
|   AL|     4046400|51.942349248714905|
|   ME|     1254060| 52.51276015501651|
|   ID|     1355750| 52.79056463212244|
|   MT|      954110| 52.83129198939326|
+-----+------------+------------------+
only showing top 10 rows



## The average AGI ranges between 46 to 90 across all the states in USA

## showing results for state of Michigan

In [None]:
result.filter(result["state"]=='MI').show()

+-----+------------+---------+-------------------------+------------------------+---------------------------+--------------------------+
|state|sum(returns)| sum(agi)|sum(returns_total_income)|sum(total_income_amount)|sum(returns_taxable_income)|sum(taxable_income_amount)|
+-----+------------+---------+-------------------------+------------------------+---------------------------+--------------------------+
|   MI|     9186100|508666492|                  9186140|               515692796|                    6787490|                 345700044|
+-----+------------+---------+-------------------------+------------------------+---------------------------+--------------------------+



In [None]:
result.filter(result["state"]=='MI').select("state","sum(returns)","sum(agi)", 
                                            lit(result["sum(agi)"]/result["sum(returns)"]).alias("avg_agi"),
                                            lit(result["sum(total_income_amount)"]/result["sum(returns_total_income)"]).alias("avg_total_income"),
                                            lit(result["sum(taxable_income_amount)"]/result["sum(returns_taxable_income)"]).alias("avg_taxable_income")).show()

+-----+------------+---------+------------------+----------------+------------------+
|state|sum(returns)| sum(agi)|           avg_agi|avg_total_income|avg_taxable_income|
+-----+------------+---------+------------------+----------------+------------------+
|   MI|     9186100|508666492|55.373498220136945|56.1381381080628|50.931941557188296|
+-----+------------+---------+------------------+----------------+------------------+



# Analyzing data by zipcodes across USA




In [None]:
df.show(5)

+-----+-----+-------+-------+---------+-----------+--------------------+-------------------+----------------+----------------------+---------------------+------------------+
|index|state|zipcode|returns|      agi|    avg_agi|returns_total_income|total_income_amount|avg_total_income|returns_taxable_income|taxable_income_amount|avg_taxable_income|
+-----+-----+-------+-------+---------+-----------+--------------------+-------------------+----------------+----------------------+---------------------+------------------+
|    0|   AL|      0|2022380|105089761|51.96340994|             2022380|          106420533|     52.62143267|               1468370|             67850874|       46.20829491|
|    1|   AL|  35004|   4930|   255534|51.83245436|                4930|             258024|     52.33752535|                  4020|               163859|       40.76094527|
|    2|   AL|  35005|   3300|   128387|38.90515152|                3300|             129390|     39.20909091|                  244

In [None]:
df.registerTempTable("mytable") 
 
# create another dataframe 'abc_contractor' with the results of select query on temp
spark.sql("""
    SELECT * FROM mytable 
""")

DataFrame[index: int, state: string, zipcode: int, returns: int, agi: int, avg_agi: double, returns_total_income: int, total_income_amount: int, avg_total_income: double, returns_taxable_income: int, taxable_income_amount: int, avg_taxable_income: double]

In [None]:
zipcode_data = spark.sql("""
    SELECT zipcode, count(zipcode) 
    FROM mytable 
    GROUP BY zipcode
    HAVING count(zipcode)>1
""")

zipcode_data.show()


+-------+--------------+
|zipcode|count(zipcode)|
+-------+--------------+
|  99999|            51|
|      0|            51|
+-------+--------------+



### There are rows by  zipcode 0 and 99999 for each state. These are not actual zipcodes but some missing data. So filtering the rows where zipcode equals 0 and 99999. 

In [None]:
zipcode_data = df.drop('index','avg_agi','avg_total_income','avg_taxable_income')

In [None]:
zipcode_data = zipcode_data.filter(~((df.zipcode==0) | (df.zipcode == 99999)))

In [None]:
zipcode_data.registerTempTable("mytable") 


In [None]:
spark.sql("""
      SELECT * FROM mytable
      """).show(5)

+-----+-------+-------+------+--------------------+-------------------+----------------------+---------------------+
|state|zipcode|returns|   agi|returns_total_income|total_income_amount|returns_taxable_income|taxable_income_amount|
+-----+-------+-------+------+--------------------+-------------------+----------------------+---------------------+
|   AL|  35004|   4930|255534|                4930|             258024|                  4020|               163859|
|   AL|  35005|   3300|128387|                3300|             129390|                  2440|                70760|
|   AL|  35006|   1230| 58302|                1230|              58585|                   940|                36341|
|   AL|  35007|  11990|643708|               11990|             651350|                  9280|               414878|
|   AL|  35010|   8320|378497|                8320|             382106|                  5610|               226514|
+-----+-------+-------+------+--------------------+-------------

In [None]:
spark.sql("""
    SELECT COUNT(*) FROM mytable 
    where zipcode=0 OR zipcode = 99999;
    """).show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [None]:
spark.sql("""
         SELECT * FROM mytable
         ORDER BY agi DESC
         LIMIT 10
         """).show()

+-----+-------+-------+--------+--------------------+-------------------+----------------------+---------------------+
|state|zipcode|returns|     agi|returns_total_income|total_income_amount|returns_taxable_income|taxable_income_amount|
+-----+-------+-------+--------+--------------------+-------------------+----------------------+---------------------+
|   NY|  10021|  23860|11744436|               23860|           11877178|                 21520|              9137327|
|   NY|  10128|  32510|10821609|               32510|           10972659|                 29020|              8375493|
|   NY|  10023|  33900|10708893|               33900|           10861855|                 29980|              8628787|
|   NY|  10022|  19780|10036729|               19780|           10328745|                 17810|              6750437|
|   NY|  10024|  30820| 9152767|               30820|            9317798|                 27140|              7219049|
|   NY|  10028|  25370| 8970406|               2

In [None]:
spark.sql("""
         SELECT state, zipcode, returns, (agi/returns) as average_agi FROM mytable
         ORDER BY average_agi DESC
         LIMIT 10
         """).show()

+-----+-------+-------+------------------+
|state|zipcode|returns|       average_agi|
+-----+-------+-------+------------------+
|   FL|  33109|    240|         1815.5375|
|   CA|  94027|   3230| 1149.294427244582|
|   CA|  94301|   8650|1006.3246242774567|
|   NY|  10005|   5660| 944.1335689045936|
|   NJ|   7931|   1640| 886.4914634146342|
|   NJ|   7976|    710| 837.3830985915492|
|   CA|  90067|   3190|  811.582131661442|
|   NJ|   7078|   5880| 688.3840136054422|
|   PA|  19035|   2020|  680.859900990099|
|   NY|  10577|   1520| 670.8940789473684|
+-----+-------+-------+------------------+



In [None]:
zip_states = spark.read.option('header','true').csv('/content/zip_codes_states.csv', inferSchema = True)
zip_states.printSchema()

root
 |-- zip_code: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- county: string (nullable = true)



In [None]:
zip_states.registerTempTable('zip_states')

## Top 10 richest Zipcodes with city names across USA

In [None]:
spark.sql("""
         SELECT mytable.state, mytable.zipcode, zip_states.city, returns, ROUND(agi/returns, 2) as average_agi 
         FROM mytable
         LEFT JOIN zip_states ON mytable.zipcode = zip_states.zip_code
         ORDER BY average_agi DESC
         LIMIT 10
         """).show()

+-----+-------+-----------+-------+-----------+
|state|zipcode|       city|returns|average_agi|
+-----+-------+-----------+-------+-----------+
|   FL|  33109|Miami Beach|    240|    1815.54|
|   CA|  94027|   Atherton|   3230|    1149.29|
|   CA|  94301|  Palo Alto|   8650|    1006.32|
|   NY|  10005|   New York|   5660|     944.13|
|   NJ|   7931|  Far Hills|   1640|     886.49|
|   NJ|   7976| New Vernon|    710|     837.38|
|   CA|  90067|Los Angeles|   3190|     811.58|
|   NJ|   7078|Short Hills|   5880|     688.38|
|   PA|  19035|   Gladwyne|   2020|     680.86|
|   NY|  10577|   Purchase|   1520|     670.89|
+-----+-------+-----------+-------+-----------+



## 10 poor zip codes with city names across USA

In [None]:
spark.sql("""
         SELECT mytable.state, mytable.zipcode, zip_states.city, returns, ROUND(agi/returns, 2) as average_agi 
         FROM mytable
         LEFT JOIN zip_states ON mytable.zipcode = zip_states.zip_code
         ORDER BY average_agi ASC
         LIMIT 10
         """).show()

+-----+-------+--------------+-------+-----------+
|state|zipcode|          city|returns|average_agi|
+-----+-------+--------------+-------+-----------+
|   IN|  47406|   Bloomington|    170|       8.72|
|   MO|  64147|   Kansas City|    200|      12.22|
|   MN|  55455|   Minneapolis|    150|       14.1|
|   MO|  63140|   Saint Louis|    100|      15.11|
|   SD|  57658|       Wakpala|    110|      15.44|
|   UT|  84112|Salt Lake City|    320|      16.15|
|   WI|  53706|       Madison|    150|       17.2|
|   TN|  38126|       Memphis|   2290|       17.4|
|   OH|  45225|    Cincinnati|   3120|      17.95|
|   KY|  40982|         Scalf|    110|      18.05|
+-----+-------+--------------+-------+-----------+



# Analyzing Michigan data

In [None]:
michigan_data = zipcode_data.filter(zipcode_data.state =='MI')
michigan_data.show(5)

+-----+-------+-------+------+--------------------+-------------------+----------------------+---------------------+
|state|zipcode|returns|   agi|returns_total_income|total_income_amount|returns_taxable_income|taxable_income_amount|
+-----+-------+-------+------+--------------------+-------------------+----------------------+---------------------+
|   MI|  48001|   5870|300488|                5870|             304026|                  4400|               198625|
|   MI|  48002|   1520| 81051|                1520|              82078|                  1190|                53257|
|   MI|  48003|   3010|167060|                3010|             169027|                  2370|               111010|
|   MI|  48005|   2610|167384|                2610|             169652|                  2120|               117640|
|   MI|  48006|   1770| 91790|                1770|              92800|                  1370|                59106|
+-----+-------+-------+------+--------------------+-------------

In [None]:
spark.sql("""
          SELECT  COUNT(*) FROM MI_table
          """).show()

+--------+
|count(1)|
+--------+
|     890|
+--------+



Michigan has a total of 979 active zip codes. But we have 890 right now so some of the data is missing and it might have included under zipcode 0 or 99999.


Minimum and Maximum average gross income across Michigan state

In [None]:
michigan_data.registerTempTable('MI_table')

spark.sql("""
          SELECT  MIN(agi), MAX(agi) FROM MI_table
          """).show()

+--------+--------+
|min(agi)|max(agi)|
+--------+--------+
|    3543| 2519849|
+--------+--------+



## Top 15 rich zip codes from Michigan state with city names by average AGI.

In [None]:
spark.sql("""
         SELECT MI_table.zipcode, zip_states.city, returns, ROUND(agi/returns, 2) as average_agi 
         FROM MI_table
         LEFT JOIN zip_states ON MI_table.zipcode = zip_states.zip_code
         ORDER BY average_agi DESC
         LIMIT 15
         """).show()

+-------+----------------+-------+-----------+
|zipcode|            city|returns|average_agi|
+-------+----------------+-------+-----------+
|  48304|Bloomfield Hills|   8350|     265.62|
|  48009|      Birmingham|  10310|     244.41|
|  48302|Bloomfield Hills|   8200|     229.25|
|  48301|Bloomfield Hills|   7120|      207.4|
|  49301|             Ada|   9010|     174.78|
|  48025|        Franklin|   7430|      168.6|
|  48168|            null|  10770|     150.57|
|  48323| West Bloomfield|   8920|     140.96|
|  48374|            Novi|   6890|      139.5|
|  48070|Huntington Woods|   3080|     139.32|
|  48306|       Rochester|  13010|     136.21|
|  49060| Hickory Corners|    860|     130.72|
|  48363|         Oakland|   2750|     130.14|
|  48236|   Grosse Pointe|  15740|     127.23|
|  48098|            Troy|  10070|     122.87|
+-------+----------------+-------+-----------+



## Bottom 15 zipcodes across Michigan by city names by average AGI below 24.29

In [None]:
spark.sql("""
         SELECT MI_table.zipcode, zip_states.city, returns, ROUND(agi/returns, 2) as average_agi 
         FROM MI_table
         LEFT JOIN zip_states ON MI_table.zipcode = zip_states.zip_code
         ORDER BY average_agi ASC
         LIMIT 15
         """).show()

+-------+---------+-------+-----------+
|zipcode|     city|returns|average_agi|
+-------+---------+-------+-----------+
|  48505|    Flint|   8110|      19.73|
|  48212|Hamtramck|  12880|      20.54|
|  48213|  Detroit|   8370|      21.31|
|  48607|  Saginaw|    550|      21.35|
|  48210|  Detroit|   8120|      22.16|
|  48211|  Detroit|   1960|      22.17|
|  48238|  Detroit|  10220|      22.52|
|  48342|  Pontiac|   7370|      22.55|
|  48228|  Detroit|  19260|      22.66|
|  48204|  Detroit|   9120|      22.77|
|  48205|  Detroit|  14300|       22.8|
|  48209|  Detroit|   9440|       23.0|
|  48208|  Detroit|   3140|       24.0|
|  48234|  Detroit|  13170|      24.01|
|  48206|  Detroit|   6370|      24.29|
+-------+---------+-------+-----------+



# Average AGI of Michigan state across USA is around 55 , and it ranges from 19.73 to 265.62 across the zipcodes in Michigan state.