In [27]:
from pyspark.sql import SparkSession

In [28]:
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config('spark.shuffle.useOldFetchProtocol', 'true'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [10]:
!hadoop fs -ls /public/sms/users

Found 10 items
-rw-r--r--   2 hdfs supergroup   29485784 2021-01-28 11:30 /public/sms/users/users_01.json
-rw-r--r--   2 hdfs supergroup   29609878 2021-01-28 09:18 /public/sms/users/users_02.json
-rw-r--r--   2 hdfs supergroup   29614734 2021-01-28 09:07 /public/sms/users/users_03.json
-rw-r--r--   2 hdfs supergroup   29598763 2021-01-28 09:00 /public/sms/users/users_04.json
-rw-r--r--   2 hdfs supergroup   29614029 2021-01-28 11:13 /public/sms/users/users_05.json
-rw-r--r--   2 hdfs supergroup   29600501 2021-01-28 11:03 /public/sms/users/users_06.json
-rw-r--r--   2 hdfs supergroup   29597776 2021-01-28 11:01 /public/sms/users/users_07.json
-rw-r--r--   2 hdfs supergroup   29601485 2021-01-28 08:51 /public/sms/users/users_08.json
-rw-r--r--   2 hdfs supergroup   29589818 2021-01-28 11:19 /public/sms/users/users_09.json
-rw-r--r--   2 hdfs supergroup   29600090 2021-01-28 09:20 /public/sms/users/users_10.json


In [26]:
spark.stop()

In [4]:
!hadoop fs -head /public/sms/users/users_01.json

{"user_id":1,"user_first_name":"Lezley","user_last_name":"D'Alessio","user_email":"ldalessio0@google.com.au","user_gender":"Male","user_phone_numbers":["5639521582","8433335556","9193704732","8326122969"],"user_address":{"street":"28470 Di Loreto Point","city":"Albany","state":"New York","postal_code":"12222"}}
{"user_id":2,"user_first_name":"Boot","user_last_name":"Cheetam","user_email":"bcheetam1@alexa.com","user_gender":"Male","user_phone_numbers":["9169982796","3053640448","3036850922","2033582226","5131711820"],"user_address":{"street":"035 Evergreen Place","city":"Seattle","state":"Washington","postal_code":"98148"}}
{"user_id":3,"user_first_name":"Natal","user_last_name":"Cluff","user_email":"ncluff2@de.vu","user_gender":"Male","user_phone_numbers":["9172816696","8012386892","4053271574","8125342052"],"user_address":{"street":"647 Lake View Circle","city":"Des Moines","state":"Iowa","postal_code":"50936"}}
{"user_id":4,"user_first_name":"Pedro","user_last_name":"Riediger","user_

In [4]:
spark


In [5]:
from pyspark.sql.types import *
users_schema = StructType([
    StructField("user_id", IntegerType(), nullable=False),
    StructField("user_first_name", StringType(), nullable=False),
    StructField("user_last_name", StringType(), nullable=False),
    StructField("user_email", StringType(), nullable=False),
    StructField("user_gender", StringType(), nullable=False),
    StructField("user_phone_numbers", ArrayType(StringType()), nullable=True),
    StructField("user_address", StructType([
        StructField("street", StringType(), nullable=False),
        StructField("city", StringType(), nullable=False),
        StructField("state", StringType(), nullable=False),
        StructField("postal_code", StringType(), nullable=False),
    ]), nullable=False)
])

In [6]:
df = spark.read.\
    format('json').\
    schema(users_schema).\
    load('/public/sms/users')

In [6]:
df.show(20)

+-------+---------------+--------------+--------------------+-----------+--------------------+--------------------+
|user_id|user_first_name|user_last_name|          user_email|user_gender|  user_phone_numbers|        user_address|
+-------+---------------+--------------+--------------------+-----------+--------------------+--------------------+
| 200001|         Eirena|     Cutsforth|ecutsforth0@wisc.edu|     Female|[4197404036, 9173...|{8 Warrior Drive,...|
| 200002|          Marja|      Shopcott|mshopcott1@hexun.com|     Female|[9542037028, 2128...|{66 Prairieview T...|
| 200003|           Dawn|       Tointon|  dtointon2@ucsd.edu|     Female|[9523035647, 2134...|{18 Ronald Regan ...|
| 200004|          Goldi|        Leaman|     gleaman3@360.cn|     Female|[2027069459, 7042...|{7696 Calypso Jun...|
| 200005|       Brewster|      Hallagan|bhallagan4@livejo...|       Male|[8134746319, 2152...|{942 Emmet Park, ...|
| 200006|       Florence|       Glashby|fglashby5@deviant...|     Female

In [7]:
df.rdd.getNumPartitions()

3

In [8]:
#Total count of records in the Datarame
df.count()


1000000

In [7]:
df.createOrReplaceTempView("users_9")

In [8]:
spark.sql("describe extended users_9")

col_name,data_type,comment
user_id,int,
user_first_name,string,
user_last_name,string,
user_email,string,
user_gender,string,
user_phone_numbers,array<string>,
user_address,struct<street:str...,


In [11]:
spark.sql("show tables like 'users_9'")

database,tableName,isTemporary
,users_9,True


In [12]:
spark.catalog.currentDatabase()

'default'

In [51]:
spark.sql("select count(*) from users_9")

count(1)
1000000


In [14]:
#extracting columns from nested json and creating views

In [9]:
from pyspark.sql.functions import col,size
df_add = df.withColumn("street",df.user_address.street).\
            withColumn("city",df.user_address.city).\
            withColumn("state",df.user_address.state).\
            withColumn("postal_code",df.user_address.postal_code)
#df_loc = df[[user_id],[user_address]]
df_add.show(5)

+-------+---------------+--------------+--------------------+-----------+--------------------+--------------------+--------------------+---------------+----------+-----------+
|user_id|user_first_name|user_last_name|          user_email|user_gender|  user_phone_numbers|        user_address|              street|           city|     state|postal_code|
+-------+---------------+--------------+--------------------+-----------+--------------------+--------------------+--------------------+---------------+----------+-----------+
| 200001|         Eirena|     Cutsforth|ecutsforth0@wisc.edu|     Female|[4197404036, 9173...|{8 Warrior Drive,...|     8 Warrior Drive|         Dallas|     Texas|      75358|
| 200002|          Marja|      Shopcott|mshopcott1@hexun.com|     Female|[9542037028, 2128...|{66 Prairieview T...|66 Prairieview Te...|         Joliet|  Illinois|      60435|
| 200003|           Dawn|       Tointon|  dtointon2@ucsd.edu|     Female|[9523035647, 2134...|{18 Ronald Regan ...|18 Ro

In [16]:
df_add.filter(col('state') == 'New York').count()

49576

In [17]:
df_add.filter(df_add.state == 'New York').distinct().count()

49576

In [10]:
df_add.createOrReplaceTempView("users_add")

In [56]:
spark.sql("select count(distinct(user_id)) from users_add where state = 'New York'")

count(DISTINCT user_id)
49576


In [58]:
spark.sql("select state,count(distinct(postal_code)) as cp from users_add group by state order by cp desc limit 1")

state,cp
California,206


In [11]:
from pyspark.sql.functions import col, countDistinct
df_add.groupBy("state").\
        agg(countDistinct("postal_code").alias('cp')).\
        orderBy("cp",ascending = False). \
        limit(1).show()
        


+----------+---+
|     state| cp|
+----------+---+
|California|206|
+----------+---+



In [76]:
spark.sql("select city,count(distinct(user_id)) as cp from users_add where city is not null group by city order by cp desc limit 1")

city,cp
Washington,28504


In [12]:
df_add.filter(df_add['city'].isNotNull()).\
        groupBy("city").\
        agg(countDistinct("user_id").alias('cp')).\
        orderBy("cp",ascending = False). \
        limit(1).show()

+----------+-----+
|      city|   cp|
+----------+-----+
|Washington|28504|
+----------+-----+



In [61]:
spark.sql("select count(distinct(user_id)) as cp from users_add where user_email like '%biz'")

cp
2015


In [13]:
from pyspark.sql.functions import col, countDistinct
from pyspark.sql.functions import split
df_ph_table = df_add.filter(split(df_add['user_email'],"@")[1] == 'bizjournals.com').\
        agg(countDistinct("user_email"))

In [14]:
from pyspark.sql.functions import col,size
df_add.withColumn("no_of_phone_numbers", size(col("user_phone_numbers"))).createOrReplaceTempView("df_ph_table")

In [11]:
spark.sql("select count(distinct(user_id)) from df_ph_table where no_of_phone_numbers = 4")

count(DISTINCT user_id)
179041


In [17]:
df_ph_table

count(user_email)
2015


In [49]:
df_ph_table = df_add.withColumn("no_of_phone_numbers", size(col("user_phone_numbers")))

In [50]:

df_ph_table.filter(col('no_of_phone_numbers') == 4).agg(countDistinct("user_id")).show()

+--------------+
|count(user_id)|
+--------------+
|        179041|
+--------------+



In [56]:
spark.sql("select count(distinct(user_id)) from df_ph_table where user_phone_numbers is null")

count(DISTINCT user_id)
108981


In [58]:
#df_ph_table = df_add.withColumn("no_of_phone_numbers", size(col("user_phone_numbers")))
df_ph_table.filter(col('user_phone_numbers').isNull()).agg(countDistinct("user_id")).show()

+--------------+
|count(user_id)|
+--------------+
|        108981|
+--------------+



In [55]:
spark.sql("select * from df_ph_table where no_of_phone_numbers = 0")

user_id,user_first_name,user_last_name,user_email,user_gender,user_phone_numbers,user_address,street,city,state,postal_code,no_of_phone_numbers


In [61]:
df.show(10)

+-------+---------------+--------------+--------------------+-----------+--------------------+--------------------+
|user_id|user_first_name|user_last_name|          user_email|user_gender|  user_phone_numbers|        user_address|
+-------+---------------+--------------+--------------------+-----------+--------------------+--------------------+
| 200001|         Eirena|     Cutsforth|ecutsforth0@wisc.edu|     Female|[4197404036, 9173...|{8 Warrior Drive,...|
| 200002|          Marja|      Shopcott|mshopcott1@hexun.com|     Female|[9542037028, 2128...|{66 Prairieview T...|
| 200003|           Dawn|       Tointon|  dtointon2@ucsd.edu|     Female|[9523035647, 2134...|{18 Ronald Regan ...|
| 200004|          Goldi|        Leaman|     gleaman3@360.cn|     Female|[2027069459, 7042...|{7696 Calypso Jun...|
| 200005|       Brewster|      Hallagan|bhallagan4@livejo...|       Male|[8134746319, 2152...|{942 Emmet Park, ...|
| 200006|       Florence|       Glashby|fglashby5@deviant...|     Female

In [63]:
!hadoop fs -ls /user/itv011398/data


Found 2 items
drwxr-xr-x   - itv011398 supergroup          0 2024-04-16 14:37 /user/itv011398/data/input
drwxr-xr-x   - itv011398 supergroup          0 2024-03-13 19:23 /user/itv011398/data/output


In [65]:
df.write.\
    mode("overwrite").\
    format("parquet"). \
    option("path","/user/itv011398/data/wk_9").\
    save()

In [66]:
!hadoop fs -ls /user/itv011398/data/wk_9

Found 4 items
-rw-r--r--   3 itv011398 supergroup          0 2024-10-08 17:04 /user/itv011398/data/wk_9/_SUCCESS
-rw-r--r--   3 itv011398 supergroup   27109499 2024-10-08 17:04 /user/itv011398/data/wk_9/part-00000-cfdd27f9-88ec-4810-9905-a9280c1a32d3-c000.snappy.parquet
-rw-r--r--   3 itv011398 supergroup   27112872 2024-10-08 17:04 /user/itv011398/data/wk_9/part-00001-cfdd27f9-88ec-4810-9905-a9280c1a32d3-c000.snappy.parquet
-rw-r--r--   3 itv011398 supergroup   13778332 2024-10-08 17:04 /user/itv011398/data/wk_9/part-00002-cfdd27f9-88ec-4810-9905-a9280c1a32d3-c000.snappy.parquet


In [67]:
df.rdd.getNumPartitions()

4

In [15]:
df_ph_table.show()

+-----------------+
|count(user_email)|
+-----------------+
|             2015|
+-----------------+



In [29]:
'''
###
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
from pyspark.sql.functions import col,size

spark = SparkSession. \
    builder. \
    config('spark.shuffle.useOldFetchProtocol', 'true'). \
    config("spark.sql.warehouse.dir", "/user/{username}/warehouse"). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()
###
# creating schema for reading the json file
'''
users_schema = StructType([
    StructField("user_id", IntegerType(), nullable=False),
    StructField("user_first_name", StringType(), nullable=False),
    StructField("user_last_name", StringType(), nullable=False),
    StructField("user_email", StringType(), nullable=False),
    StructField("user_gender", StringType(), nullable=False),
    StructField("user_phone_numbers", ArrayType(StringType()), nullable=True),
    StructField("user_address", StructType([
        StructField("street", StringType(), nullable=False),
        StructField("city", StringType(), nullable=False),
        StructField("state", StringType(), nullable=False),
        StructField("postal_code", StringType(), nullable=False),
    ]), nullable=False)
])

#reading the files
users_df = spark.read \
.format("json") \
.schema(users_schema) \
.load("/public/sms/users/")

users_df.withColumn("user_street",col("user_address.street")) \
.withColumn("user_city",col("user_address.city")) \
.withColumn("user_state", col("user_address.state")) \
.withColumn("user_postal_code", col("user_address.postal_code")) \
.withColumn("num_phn_numbers", size(col("user_phone_numbers"))).createOrReplaceTempView("users_vw")

In [30]:
spark.sql("""
    SELECT user_state,
           COUNT(CASE WHEN user_gender = 'Male' THEN user_id END) AS Male_cnt,
           COUNT(CASE WHEN user_gender = 'Female' THEN user_id END) AS Female_cnt
    FROM users_vw
    WHERE user_state IS NOT NULL 
          AND user_phone_numbers IS NOT NULL
    GROUP BY user_state
""").show()


+--------------------+--------+----------+
|          user_state|Male_cnt|Female_cnt|
+--------------------+--------+----------+
|                Utah|    4073|      4171|
|              Hawaii|    2172|      2062|
|           Minnesota|    9371|      9250|
|                Ohio|   16322|     16239|
|              Oregon|    3899|      3841|
|            Arkansas|    2420|      2416|
|               Texas|   48786|     48450|
|        North Dakota|     981|       940|
|        Pennsylvania|   14270|     14237|
|         Connecticut|    5797|      5917|
|            Nebraska|    3501|      3688|
|             Vermont|     227|       237|
|              Nevada|    6317|      6495|
|          Washington|    8812|      8755|
|            Illinois|   11178|     11267|
|            Oklahoma|    6888|      6913|
|District of Columbia|   14212|     14292|
|            Delaware|    1651|      1654|
|              Alaska|    1882|      1938|
|          New Mexico|    2804|      2745|
+----------

In [33]:
#5 state-wise gender count
users_final_df = spark.sql("""
    SELECT user_state,
           COUNT(CASE WHEN user_gender = 'Male' THEN user_id END) AS Male_cnt,
           COUNT(CASE WHEN user_gender = 'Female' THEN user_id END) AS Female_cnt
    FROM users_vw
    WHERE user_state IS NOT NULL 
          AND user_phone_numbers IS NOT NULL
    GROUP BY user_state
""")

users_final_df.write.format("csv").mode("overwrite").option("path","/user/itv011398/pivot_assignment_result").save()

spark.stop()

Py4JJavaError: An error occurred while calling o322.save.
: java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:529)
	at scala.None$.get(Option.scala:527)
	at org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker$.metrics(BasicWriteStatsTracker.scala:175)
	at org.apache.spark.sql.execution.command.DataWritingCommand.metrics(DataWritingCommand.scala:51)
	at org.apache.spark.sql.execution.command.DataWritingCommand.metrics$(DataWritingCommand.scala:51)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.metrics$lzycompute(InsertIntoHadoopFsRelationCommand.scala:49)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.metrics(InsertIntoHadoopFsRelationCommand.scala:49)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.metrics$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.metrics(commands.scala:104)
	at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:63)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:101)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)


In [34]:
!hadoop fs -ls /user/itv011398/pivot_assignment_result

Found 52 items
-rw-r--r--   3 itv011398 supergroup          0 2024-10-08 23:45 /user/itv011398/pivot_assignment_result/_SUCCESS
-rw-r--r--   3 itv011398 supergroup         18 2024-10-08 23:45 /user/itv011398/pivot_assignment_result/part-00000-35755110-9b26-43cf-9426-5791e7571ed6-c000.csv
-rw-r--r--   3 itv011398 supergroup         17 2024-10-08 23:45 /user/itv011398/pivot_assignment_result/part-00001-35755110-9b26-43cf-9426-5791e7571ed6-c000.csv
-rw-r--r--   3 itv011398 supergroup         18 2024-10-08 23:45 /user/itv011398/pivot_assignment_result/part-00002-35755110-9b26-43cf-9426-5791e7571ed6-c000.csv
-rw-r--r--   3 itv011398 supergroup         19 2024-10-08 23:45 /user/itv011398/pivot_assignment_result/part-00003-35755110-9b26-43cf-9426-5791e7571ed6-c000.csv
-rw-r--r--   3 itv011398 supergroup         23 2024-10-08 23:45 /user/itv011398/pivot_assignment_result/part-00004-35755110-9b26-43cf-9426-5791e7571ed6-c000.csv
-rw-r--r--   3 itv011398 supergroup         21 2024-10-08 23:45 /us

In [37]:
!hadoop fs -cat /user/itv011398/pivot_assignment_result/*

Alabama,9307,9178
Alaska,1882,1938
Arizona,9406,9543
Arkansas,2420,2416
California,49120,48716
Colorado,10128,10125
Connecticut,5797,5917
Delaware,1651,1654
District of Columbia,14212,14292
Florida,36692,36688
Georgia,13008,13028
Hawaii,2172,2062
Idaho,2058,2101
Illinois,11178,11267
Indiana,9604,9676
Iowa,4706,4726
Kansas,5962,5776
Kentucky,6216,6108
Louisiana,8706,8631
Maine,225,228
Maryland,5707,5797
Massachusetts,6664,6610
Michigan,8514,8625
Minnesota,9371,9250
Mississippi,2681,2599
Missouri,9307,9547
Montana,1225,1165
Nebraska,3501,3688
Nevada,6317,6495
New Hampshire,754,722
New Jersey,4268,4302
New Mexico,2804,2745
New York,25078,24498
North Carolina,10896,10909
North Dakota,981,940
Ohio,16322,16239
Oklahoma,6888,6913
Oregon,3899,3841
Pennsylvania,14270,14237
Rhode Island,462,469
South Carolina,4864,4793
South Dakota,1206,1237
Tennessee,8832,8813
Texas,48786,48450
Utah,4073,4171
Vermont,227,237
Virginia,15849,15607
Washington,8812,8755
West Virginia,4173,4281
Wisconsin,4649,4744
W