In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode

import requests
import json

# Initialize Spark session
spark = SparkSession.builder.appName("Fetch and Flatten JSON").getOrCreate()

# Fetch data from API
api_url = "https://randomuser.me/api/?results=250"
response = requests.get(api_url)

if response.status_code == 200:
    raw_data = response.text  # Get raw JSON text
else:
    raise Exception("Error fetching data:", response.status_code)



In [None]:
# Create DataFrame from JSON string
df_raw = spark.read.json(spark.sparkContext.parallelize([raw_data]))

df_exploded = df_raw.select(explode(col("results")).alias("user"))


personal_info = df_exploded.select(
    col("user.gender").alias("gender"),
    col("user.name.title").alias("title"),
    col("user.name.first").alias("first_name"),
    col("user.name.last").alias("last_name"),
    col("user.dob.date").alias("dob_date"),
    col("user.dob.age").alias("dob_age"),
    col("user.email").alias("email"),
    col("user.cell").alias("cell"),
    col("user.nat").alias("nationality")
)


location_info = df_exploded.select(
    col("user.location.street.number").alias("street_number"),
    col("user.location.street.name").alias("street_name"),
    col("user.location.city").alias("city"),
    col("user.location.state").alias("state"),
    col("user.location.country").alias("country"),
    col("user.location.postcode").alias("postcode"),
    col("user.location.coordinates.latitude").alias("latitude"),
    col("user.location.coordinates.longitude").alias("longitude"),
    col("user.location.timezone.offset").alias("timezone_offset"),
    col("user.location.timezone.description").alias("timezone_desc")
)


login_info = df_exploded.select(
    col("user.login.username").alias("username"),
    col("user.login.password").alias("password")
)

picture_info = df_exploded.select(
    col("user.picture.large").alias("picture_large"),
    col("user.picture.medium").alias("picture_medium")
)

df = personal_info.join(location_info, how='inner').join(login_info, how='inner').join(picture_info, how='inner')

df.printSchema()
print("Total Records:", df.count())


root
 |-- gender: string (nullable = true)
 |-- title: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- street_number: long (nullable = true)
 |-- street_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- postcode: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- timezone_offset: string (nullable = true)
 |-- timezone_desc: string (nullable = true)
 |-- email: string (nullable = true)
 |-- username: string (nullable = true)
 |-- password: string (nullable = true)
 |-- dob_date: string (nullable = true)
 |-- dob_age: long (nullable = true)
 |-- cell: string (nullable = true)
 |-- picture_large: string (nullable = true)
 |-- picture_medium: string (nullable = true)
 |-- nationality: string (nullable = true)

Total Records: 250


In [None]:
df.limit(10).display()

gender,title,first_name,last_name,street_number,street_name,city,state,country,postcode,latitude,longitude,timezone_offset,timezone_desc,email,username,password,dob_date,dob_age,cell,picture_large,picture_medium,nationality
female,Madame,Cynthia,Martinez,1632,Rue Paul Bert,Le Noirmont,Obwalden,Switzerland,5574,-63.1595,25.8699,-3:00,"Brazil, Buenos Aires, Georgetown",cynthia.martinez@example.com,happyladybug294,vinnie,1970-07-31,54,076 111 79 24,https://randomuser.me/api/portraits/women/61.jpg,https://randomuser.me/api/portraits/med/women/61.jpg,CH
male,Mr,Koray,Biçer,1834,Anafartalar Cd,Çanakkale,Yozgat,Turkey,14082,-80.9845,102.1973,+8:00,"Beijing, Perth, Singapore, Hong Kong",koray.bicer@example.com,yellowsnake572,shooter,1980-09-13,44,(782)-144-6110,https://randomuser.me/api/portraits/men/47.jpg,https://randomuser.me/api/portraits/med/men/47.jpg,TR
male,Mr,Quentin,Mathieu,8221,Avenue Tony-Garnier,Tours,Somme,France,17575,6.6358,-69.4659,+9:00,"Tokyo, Seoul, Osaka, Sapporo, Yakutsk",quentin.mathieu@example.com,angryostrich566,wonder,1964-04-14,60,06-10-16-05-81,https://randomuser.me/api/portraits/men/20.jpg,https://randomuser.me/api/portraits/med/men/20.jpg,FR
female,Ms,Rosa,Jørgensen,4884,Grønnevej,Roslev,Syddanmark,Denmark,92210,-21.4013,-105.3631,-2:00,Mid-Atlantic,rosa.jorgensen@example.com,angryrabbit125,madonna,1978-08-07,46,62411859,https://randomuser.me/api/portraits/women/61.jpg,https://randomuser.me/api/portraits/med/women/61.jpg,DK
female,Miss,Harper,Ambrose,9343,York St,Selkirk,Alberta,Canada,E8N 9N4,-38.9521,-118.981,+5:45,Kathmandu,harper.ambrose@example.com,lazygorilla500,gone,1992-08-05,32,W42 C42-6064,https://randomuser.me/api/portraits/women/48.jpg,https://randomuser.me/api/portraits/med/women/48.jpg,CA
male,Mr,Guneet,Acharya,7965,Chaman Ganj,Bellary,Punjab,India,12365,-63.7786,122.7727,+3:00,"Baghdad, Riyadh, Moscow, St. Petersburg",guneet.acharya@example.com,blackwolf377,1958,1967-12-04,57,9802738378,https://randomuser.me/api/portraits/men/60.jpg,https://randomuser.me/api/portraits/med/men/60.jpg,IN
female,Ms,Tracy,Patterson,1226,Queens Road,Bath,Tayside,United Kingdom,NJ1 6JP,-84.9295,-93.7224,-2:00,Mid-Atlantic,tracy.patterson@example.com,crazycat129,electric,1959-06-18,65,07742 163294,https://randomuser.me/api/portraits/women/1.jpg,https://randomuser.me/api/portraits/med/women/1.jpg,GB
female,Mrs,Asta,Rasmussen,4039,Morelvej,Roskilde,Danmark,Denmark,70150,68.9577,-34.158,+10:00,"Eastern Australia, Guam, Vladivostok",asta.rasmussen@example.com,tinymeercat164,maria,1965-04-08,59,16894548,https://randomuser.me/api/portraits/women/89.jpg,https://randomuser.me/api/portraits/med/women/89.jpg,DK
male,Mr,Logan,Moulin,931,Avenue du Château,Saint-Denis,Jura,France,85805,-69.6824,78.3884,-1:00,"Azores, Cape Verde Islands",logan.moulin@example.com,silverladybug716,broken,1997-11-23,27,06-03-03-23-04,https://randomuser.me/api/portraits/men/75.jpg,https://randomuser.me/api/portraits/med/men/75.jpg,FR
female,Ms,مرسانا,رضایی,1865,مالک اشتر,اصفهان,کرمانشاه,Iran,99229,-12.6735,-118.6423,0:00,"Western Europe Time, London, Lisbon, Casablanca",mrsn.rdyy@example.com,smallpanda951,nian,1986-04-10,38,0919-036-9775,https://randomuser.me/api/portraits/women/76.jpg,https://randomuser.me/api/portraits/med/women/76.jpg,IR


### Standardize Data Types

In [None]:
df = df.withColumn("street_number", col("street_number").cast("int")) \
       .withColumn("latitude", col("latitude").cast("double")) \
       .withColumn("longitude", col("longitude").cast("double")) \
       .withColumn("dob_date", col("dob_date").cast("date")) \
       .withColumn("dob_age", col("dob_age").cast("int"))


In [None]:
df = df.withColumn("dob_date", col("dob_date").cast("date"))


### Handle Missing Values

In [None]:
df = df.fillna({
    "gender": "Unknown",
    "street_number": 0,
    "street_name": "N/A",
    "city": "N/A",
    "state": "N/A",
    "country": "N/A",
    "postcode": 0,
    "latitude": 0.0,
    "longitude": 0.0,
    "timezone_offset": "N/A",
    "timezone_desc": "N/A",
    "email": "N/A",
    "username": "N/A",
    "password": "N/A",
    "dob_date": "1900-01-01",
    "dob_age": 0,
    "cell": "N/A",
    "picture_large": "N/A",
    "picture_medium": "N/A",
    "nationality": "N/A"
})

### Normalize Data

In [None]:
df = df.withColumn("gender", lower(trim(col("gender"))))
df = df.withColumn("city", lower(trim(col("city"))))
df = df.withColumn("state", lower(trim(col("state"))))


### Encrypt Sensitive Fields

In [None]:
df = df.withColumn("password", sha2(col("password"), 256))

### Add Derived Columns

#### Add Full Name

In [None]:
df = df.withColumn("full_name", concat_ws(" ", col("title"), col("first_name"), col("last_name")))

#### Calculate Age Group

In [None]:
df = df.withColumn("age_group", when(col("dob_age") < 18, "Minor")
                                  .when((col("dob_age") >= 18) & (col("dob_age") < 60), "Adult")
                                  .otherwise("Senior"))


### Optimize Performance

In [None]:
df = df.repartition("country")

In [None]:
df.count()
df.printSchema()

root
 |-- gender: string (nullable = false)
 |-- title: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- street_number: integer (nullable = false)
 |-- street_name: string (nullable = false)
 |-- city: string (nullable = false)
 |-- state: string (nullable = false)
 |-- country: string (nullable = false)
 |-- postcode: string (nullable = false)
 |-- latitude: double (nullable = false)
 |-- longitude: double (nullable = false)
 |-- timezone_offset: string (nullable = false)
 |-- timezone_desc: string (nullable = false)
 |-- email: string (nullable = false)
 |-- username: string (nullable = false)
 |-- password: string (nullable = true)
 |-- dob_date: date (nullable = true)
 |-- dob_age: integer (nullable = false)
 |-- cell: string (nullable = false)
 |-- picture_large: string (nullable = false)
 |-- picture_medium: string (nullable = false)
 |-- nationality: string (nullable = false)
 |-- full_name: string (nullable = false

In [None]:
new_column_order = [
    # Personal Information
    "title", "first_name", "last_name", "full_name", "gender", "dob_date", "dob_age", "age_group", "nationality",
    
    # Contact Information
    "email", "cell", "username", "password",
    
    # Location Details
    "street_number", "street_name", "city", "state", "country", "postcode", "latitude", "longitude",
    # Uncomment the following line if you have created the geo_location column
    # "geo_location",
    
    # Timezone Information
    "timezone_offset", "timezone_desc",
    
    # Media Information
    "picture_large", "picture_medium"
]


df = df.select([col(c) for c in new_column_order])


df.printSchema()

# Display the DataFrame to verify
df.count()


root
 |-- title: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- full_name: string (nullable = false)
 |-- gender: string (nullable = false)
 |-- dob_date: date (nullable = true)
 |-- dob_age: integer (nullable = false)
 |-- age_group: string (nullable = false)
 |-- nationality: string (nullable = false)
 |-- email: string (nullable = false)
 |-- cell: string (nullable = false)
 |-- username: string (nullable = false)
 |-- password: string (nullable = true)
 |-- street_number: integer (nullable = false)
 |-- street_name: string (nullable = false)
 |-- city: string (nullable = false)
 |-- state: string (nullable = false)
 |-- country: string (nullable = false)
 |-- postcode: string (nullable = false)
 |-- latitude: double (nullable = false)
 |-- longitude: double (nullable = false)
 |-- timezone_offset: string (nullable = false)
 |-- timezone_desc: string (nullable = false)
 |-- picture_large: string (nullable = false)
 |-

In [None]:
df.limit(10).display()

title,first_name,last_name,full_name,gender,dob_date,dob_age,age_group,nationality,email,cell,username,password,street_number,street_name,city,state,country,postcode,latitude,longitude,timezone_offset,timezone_desc,picture_large,picture_medium
Mr,Koray,Biçer,Mr Koray Biçer,male,1980-09-13,44,Adult,TR,koray.bicer@example.com,(782)-144-6110,yellowsnake572,c3e8e999bf52bb9aa3309c8e81233f1815029e6067c3c534d6eccebe3650e737,1834,Anafartalar Cd,çanakkale,yozgat,Turkey,14082,-80.9845,102.1973,+8:00,"Beijing, Perth, Singapore, Hong Kong",https://randomuser.me/api/portraits/men/47.jpg,https://randomuser.me/api/portraits/med/men/47.jpg
Mrs,Melike,Aykaç,Mrs Melike Aykaç,female,1953-11-11,71,Senior,TR,melike.aykac@example.com,(470)-257-0969,heavyswan529,79082e07de34a101868e209caa4c29ca8c296923edfaf203e22b8fbbab744f96,8941,Filistin Cd,eskişehir,konya,Turkey,15513,-72.7591,-95.9688,-5:00,"Eastern Time (US & Canada), Bogota, Lima",https://randomuser.me/api/portraits/women/82.jpg,https://randomuser.me/api/portraits/med/women/82.jpg
Mr,Efe,Süleymanoğlu,Mr Efe Süleymanoğlu,male,1951-04-04,73,Senior,TR,efe.suleymanoglu@example.com,(465)-416-8916,goldenmeercat225,8849853b957fe153b7056d0e7d65f99fb21070daf5122ddf1d7c942d4643c33d,2239,Şehitler Cd,adıyaman,sinop,Turkey,85656,29.1194,-99.4952,-4:00,"Atlantic Time (Canada), Caracas, La Paz",https://randomuser.me/api/portraits/men/44.jpg,https://randomuser.me/api/portraits/med/men/44.jpg
Mr,Ahmet,Akal,Mr Ahmet Akal,male,1945-12-02,79,Senior,TR,ahmet.akal@example.com,(938)-441-5421,organicleopard340,faa2a9bdcc837728fffdebbca0ada344aca67ebe6aa54836d298377820590de2,4253,Tunalı Hilmi Cd,elazığ,tokat,Turkey,61559,16.132,-120.5766,+9:00,"Tokyo, Seoul, Osaka, Sapporo, Yakutsk",https://randomuser.me/api/portraits/men/74.jpg,https://randomuser.me/api/portraits/med/men/74.jpg
Ms,Meral,Fahri,Ms Meral Fahri,female,1988-12-04,36,Adult,TR,meral.fahri@example.com,(872)-272-1581,bigsnake420,8382b57bf3739ad6689401dc5ad2c2c0ee86f7431cc90975df55e74bd17dfad7,1014,Mevlana Cd,bitlis,ardahan,Turkey,67304,38.1334,21.9104,-10:00,Hawaii,https://randomuser.me/api/portraits/women/76.jpg,https://randomuser.me/api/portraits/med/women/76.jpg
Mr,Mehmet,Hakyemez,Mr Mehmet Hakyemez,male,2001-04-13,23,Adult,TR,mehmet.hakyemez@example.com,(641)-458-3044,bigbird998,308a173ee6765335a4b104cd7bbdbf3b23aaced3ed27cf9557a2e0a9749b4317,2946,Atatürk Sk,ardahan,samsun,Turkey,15020,0.3486,86.3735,+3:30,Tehran,https://randomuser.me/api/portraits/men/97.jpg,https://randomuser.me/api/portraits/med/men/97.jpg
Miss,Deniz,Yıldızoğlu,Miss Deniz Yıldızoğlu,female,1959-02-24,65,Senior,TR,deniz.yildizoglu@example.com,(875)-029-7378,brownpanda612,16483ee69d8740609947417601985451fce674f30777c24b2227b35ea3cbeb34,8411,Necatibey Cd,giresun,niğde,Turkey,22599,-3.6132,-148.7995,-11:00,"Midway Island, Samoa",https://randomuser.me/api/portraits/women/91.jpg,https://randomuser.me/api/portraits/med/women/91.jpg
Mrs,Nurdan,Ekşioğlu,Mrs Nurdan Ekşioğlu,female,1955-06-04,69,Senior,TR,nurdan.eksioglu@example.com,(581)-100-4101,happyfish118,4e54907646fe2aa03e2ff2fd31f33efae134ec52c52b37df831c3ab6c097161b,8887,Bağdat Cd,bingöl,şırnak,Turkey,90883,-5.1778,-179.0948,-5:00,"Eastern Time (US & Canada), Bogota, Lima",https://randomuser.me/api/portraits/women/43.jpg,https://randomuser.me/api/portraits/med/women/43.jpg
Mr,Esat,Keçeci,Mr Esat Keçeci,male,1963-05-03,61,Senior,TR,esat.kececi@example.com,(068)-919-7814,silverbird523,35602208e86ac7d6b3a63780a9538a9d1763a646d5b9f3930a0548e0983e0ca6,400,Talak Göktepe Cd,bartın,giresun,Turkey,22361,-65.118,-100.6295,+8:00,"Beijing, Perth, Singapore, Hong Kong",https://randomuser.me/api/portraits/men/28.jpg,https://randomuser.me/api/portraits/med/men/28.jpg
Mr,Ahmet,Limoncuoğlu,Mr Ahmet Limoncuoğlu,male,1999-03-01,25,Adult,TR,ahmet.limoncuoglu@example.com,(153)-376-3816,greenrabbit262,104aecd611f970b35d4954aad758cb4029fa93e98302d7943181b052e21b4aad,8628,Doktorlar Cd,rize,bingöl,Turkey,18272,10.2331,-23.9348,+1:00,"Brussels, Copenhagen, Madrid, Paris",https://randomuser.me/api/portraits/men/65.jpg,https://randomuser.me/api/portraits/med/men/65.jpg


In [None]:
from datetime import datetime
from pyspark.sql import SparkSession
import boto3
import os
import shutil

# Initialize Spark session for local machine
spark = SparkSession.builder \
    .appName("Local PySpark CSV Export") \
    .master("local[*]") \
    .getOrCreate()

# Generate filename
filename = f"random_users_{datetime.now().strftime('%d_%m_%Y_%H_%M_%S')}.csv"

# Local temporary path
local_output_path = "../csv"

# Write DataFrame to local path
df.coalesce(1) \
    .write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(local_output_path)

# Find and rename the part file locally
for file in os.listdir(local_output_path):
    if file.startswith("part-"):
        os.rename(os.path.join(local_output_path, file), os.path.join(local_output_path, filename))
        break

# Upload the renamed CSV file to S3

s3 = boto3.client('s3',
                  aws_access_key_id='AKIAUMYCIDRWZMGEJ4XL',
                  aws_secret_access_key='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')

s3.upload_file(os.path.join(local_temp_path, filename), "user-api-data-databricks", f"csv/{filename}")


print("Single CSV file written to S3 successfully!")


Single CSV file written to S3 successfully!
