In [1]:
import time

import requests
from user_agents import parse
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.connect.dataframe import DataFrame
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import col, lit, split, hour, when, coalesce, udf


spark = (SparkSession.builder
         .appName("ExtractJob")
         .config('spark.jars.packages', "org.postgresql:postgresql:42.7.3")
         .getOrCreate())



In [2]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



In [8]:
ids = df.select(col('firstname')).collect()
id_list_str = [str(row['firstname']) for row in ids]
id_list_str

['James', 'Michael', 'Robert', 'Maria', 'Jen']

# https://www.machinelearningplus.com/pyspark/run-sql-queries-with-pyspark/  for tomorrow


In [34]:
# host = 'postgres'
# port = 5432
# dbname = 'postgres'
# login = 'airflow'
# password = 'airflow'

# url = f"jdbc:postgresql://{host}:{port}/{dbname}"
# properties = {
#     "user": login,
#     "password": password,
#     "driver": "org.postgresql.Driver"
# }

# url = f"jdbc:postgresql://postgres:5432/postgres"
# connection_properties = {
#     "user": "airflow",
#     "password": "airflow",
#     "driver": "org.postgresql.Driver"
# }
#
# query = f"(SELECT * FROM logs WHERE status = 'uploaded' AND DATE(created_at) = '{datetime.now().strftime('%Y-%m-%d')}') AS temp_table"
# df = spark.read \
#     .format("jdbc") \
#     .option("url", url) \
#     .option("dbtable", query) \
#     .option("user", connection_properties["user"]) \
#     .option("password", connection_properties["password"]) \
#     .option("driver", connection_properties["driver"]) \
#     .load()
#
# df.show(truncate=False)

#
# df = spark.read.format("jdbc").options(
#     url=f"jdbc:postgresql://postgres:5432/postgres",
#     driver="org.postgresql.Driver",
#     dbtable='logs',
#     user='airflow',
#     password='airflow'
# ).load()
#
# df.show(truncate=False)

In [43]:
@udf(returnType=StringType())
def get_location_by_ip(ip_address):
    time.sleep(0.01)
    try:
        # Sending a request to the ipapi service
        response = requests.get(f"http://ipapi.co/{ip_address}/json/")
        data = response.json()

        if response.status_code == 200:
            country = data.get("country_name", "Unknown")
            city = data.get("city", "Unknown")
            region = data.get("region", "Unknown")
            print(f"IP Address: {ip_address}")
            print(f"Country: {country}")
            print(f"City: {city}")
            print(f"Region: {region}")
            return f'{country}, {city}, {region}'
        else:
            print(f"Error retrieving data for IP: {ip_address}")
            return 'Unknown, Unknown, Unknown'

    except Exception as e:
        print(f"An error occurred: {e}")
        return 'Unknown, Unknown, Unknown'

@udf(returnType=StringType())
def categorize_age_group(age: int) -> str:
    if not age: return 'None'
    
    if age >= 60:
        return '60+'
    elif age >= 41:
        return '41-60'
    elif age >= 26:
        return '26-40'
    elif age >= 18:
        return '18-25'
    return '1-18'

@udf(returnType=StringType())
def extract_log_info(user_agent_str):
    user_agent = parse(user_agent_str)
    
    device_type = "Tablet" if user_agent.is_tablet else ("Mobile" if user_agent.is_mobile else "Laptop")
    os_family = user_agent.os.family
    browser_family = user_agent.browser.family

    return f"{device_type}, {os_family}, {browser_family}"


In [44]:
schema = StructType([
    StructField("username", StringType()),
    StructField("age", IntegerType()),
    StructField("gender", StringType()),
    StructField("ad_position", StringType()),
    StructField("browsing_history", StringType()),
    StructField("activity_time", TimestampType()),
    StructField("ip_address", StringType()),
    StructField("log", StringType()),
    StructField("redirect_from", StringType()),
    StructField("redirect_to", StringType()),
    StructField("id", StringType()),
    StructField("created_at", TimestampType()),
    StructField("status", StringType()),
])

_options = {
    "header": "true", 
    "delimiter": ","
}

df: DataFrame = (spark.read
                 .options(**_options)
                 .schema(schema)
                 .csv("data.csv"))

# df.show(3, truncate=False)

In [53]:
df = (df
      .withColumn('full_address_by_ip', get_location_by_ip(col('ip_address')))
      .withColumn('country', split(col('full_address_by_ip'), ',')[0])
      .withColumn('city', split(col('full_address_by_ip'), ',')[1])
      .withColumn('age_group', categorize_age_group(col('age')))
      .withColumn("hour", hour(col("activity_time")).cast(IntegerType()))
      .withColumn("time_of_day",
                   when(col("hour").between(5, 11), "morning")
                   .when(col("hour").between(12, 17), "afternoon")
                   .when(col("hour").between(18, 22), "evening")
                   .otherwise("night"))
      .withColumn('parsed_log_info', extract_log_info(col('log')))
      .withColumn('device_type', split(col('parsed_log_info'), ',')[0])
      .withColumn('os_family', split(col('parsed_log_info'), ',')[1])
      .withColumn('browser_family', split(col('parsed_log_info'), ',')[2])
)

ids = df.select(col('id')).collect()
df = df.drop('full_address_by_ip', 'log_info', 'created_at', 'status', 'username', 'id')

df.show(3)

+-------------+---+-----------+-----------+----------------+-------------------+--------------+--------------------+-----------------+-------------------+--------------------+--------------------+--------+-------------+-------------+---------+----+-----------+--------------------+-----------+---------+--------------+
|     username|age|     gender|ad_position|browsing_history|      activity_time|    ip_address|                 log|    redirect_from|        redirect_to|                  id|          created_at|  status|      country|         city|age_group|hour|time_of_day|     parsed_log_info|device_type|os_family|browser_family|
+-------------+---+-----------+-----------+----------------+-------------------+--------------+--------------------+-----------------+-------------------+--------------------+--------------------+--------+-------------+-------------+---------+----+-----------+--------------------+-----------+---------+--------------+
|monicamullins| 59|genderqueer|      popup|

In [54]:
df.select(col('country')).distinct().show()


+-------------+
|      country|
+-------------+
|    Singapore|
|      Germany|
|       France|
|       Taiwan|
|United States|
|      Unknown|
|     Thailand|
|  South Korea|
+-------------+



In [73]:
df.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").csv("transform_data.csv") 

# from tempfile import NamedTemporaryFile
# with NamedTemporaryFile() as tmp:
#   df.coalesce(1).write.format('csv').options(header=True).save('lol.asd')

# (df
#    .repartition(1)
#    .write.format("com.databricks.spark.csv")
#    .option("header", "true")
#    .save("mydata1.csv"))
# 
# 
# (df
#    .coalesce(1)
#    .write.format("com.databricks.spark.csv")
#    .option("header", "true")
#    .save("mydata2.csv"))

# (df.write
#         .format("com.databricks.spark.csv")
#         .option("header", "false")
#         .mode("overwrite")
#         .save('aaa.csv'))
# df.unpersist()
# 
# df.persist()
# (df.write
#         .format("com.databricks.spark.csv")
#         .option("header", "false")
#         .mode("overwrite")
#         .save('aaa3.csv'))
# df.unpersist()