In [1]:
import requests
from json import dump

response = requests.get("https://jsonplaceholder.typicode.com/users").json()
with open("apiData.json", "w+") as f:
    dump(response, f)

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

24/11/13 10:35:56 WARN Utils: Your hostname, hcode-ThinkPad-E14-Gen-3 resolves to a loopback address: 127.0.1.1; using 192.168.1.41 instead (on interface wlp3s0)
24/11/13 10:35:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/13 10:35:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.json("apiData.json")

In [4]:
df.printSchema()

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- geo: struct (nullable = true)
 |    |    |-- lat: string (nullable = true)
 |    |    |-- lng: string (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- suite: string (nullable = true)
 |    |-- zipcode: string (nullable = true)
 |-- company: struct (nullable = true)
 |    |-- bs: string (nullable = true)
 |    |-- catchPhrase: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- username: string (nullable = true)
 |-- website: string (nullable = true)



In [5]:
# Extract nested fields such as 'address.city', 'company.name', and 'geo.lat' into separate columns.
from pyspark.sql.functions import col, struct

df = df.select(
    *df.columns,
    col("address.city").alias("cityName"),
    col("company.name").alias("companyName"),
    col("address.geo.lat").alias("latitude"),
).withColumn(
    "address", struct(
        col("address.street"),
        col("address.suite"),
        col("address.zipcode"),
        struct(
            col("address.geo.lng")
        ).alias("geo")
    )
).withColumn(
    "company", struct(
        col("company.bs"),
        col("company.catchPhrase"),
    )
)

df.printSchema()

root
 |-- address: struct (nullable = false)
 |    |-- street: string (nullable = true)
 |    |-- suite: string (nullable = true)
 |    |-- zipcode: string (nullable = true)
 |    |-- geo: struct (nullable = false)
 |    |    |-- lng: string (nullable = true)
 |-- company: struct (nullable = false)
 |    |-- bs: string (nullable = true)
 |    |-- catchPhrase: string (nullable = true)
 |-- email: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- username: string (nullable = true)
 |-- website: string (nullable = true)
 |-- cityName: string (nullable = true)
 |-- companyName: string (nullable = true)
 |-- latitude: string (nullable = true)



In [6]:
df_ = df.filter(condition=df["companyName"].contains("Crona")).show() # case insensitive 

+--------------------+--------------------+-----------------+---+-------------+--------------------+--------+-------------+-----------+---------------+--------+
|             address|             company|            email| id|         name|               phone|username|      website|   cityName|    companyName|latitude|
+--------------------+--------------------+-----------------+---+-------------+--------------------+--------+-------------+-----------+---------------+--------+
|{Kulas Light, Apt...|{harness real-tim...|Sincere@april.biz|  1|Leanne Graham|1-770-736-8031 x5...|    Bret|hildegard.org|Gwenborough|Romaguera-Crona|-37.3159|
+--------------------+--------------------+-----------------+---+-------------+--------------------+--------+-------------+-----------+---------------+--------+



In [7]:
df.groupBy("cityName").count().show()

+--------------+-----+
|      cityName|count|
+--------------+-----+
|Bartholomebury|    1|
|   Gwenborough|    1|
|   South Elvis|    1|
|     Aliyaview|    1|
|   Wisokyburgh|    1|
| McKenziehaven|    1|
|    Roscoeview|    1|
|   Lebsackbury|    1|
| South Christy|    1|
|     Howemouth|    1|
+--------------+-----+



In [11]:
with open("modifiedApiData2.json", "w+") as f:
    dump(df.toJSON().collect(), f, indent=2)


In [None]:
from json import dump, loads

data = df.toJSON().collect()
data = map(lambda x:loads(x), data)

with open("modifiedApiData.json", "w+") as f:
    dump(list(data), f)

In [9]:
test_json = spark.read.json("modifiedApiData.json")
test_json.show(truncate=False)

+------------------------------------------------------+--------------+--------------------------------------------------------------------------------+------------------+-------------------------+---+--------+------------------------+---------------------+----------------+-------------+
|address                                               |cityName      |company                                                                         |companyName       |email                    |id |latitude|name                    |phone                |username        |website      |
+------------------------------------------------------+--------------+--------------------------------------------------------------------------------+------------------+-------------------------+---+--------+------------------------+---------------------+----------------+-------------+
|{{81.1496}, Kulas Light, Apt. 556, 92998-3874}        |Gwenborough   |{harness real-time e-markets, Multi-layered client-server neur