In [85]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, stddev, ceil, current_date, year, datediff
import pyspark.pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType
import pyspark.sql.types as types

In [86]:
spark = SparkSession.builder \
        .appName("Salary vs. Price") \
        .config("spark.jars", "../jdbc/mssql-jdbc-12.6.1.jre8.jar") \
        .getOrCreate()

In [87]:
def convert_to_common_currency(currency, price):
    factors_to_usd = {
        "ARS": 0.00111,
        "USD": 1,
        "UYU": 0.026,
        "PEN": 0.27
    }
    return round(factors_to_usd[currency] * price, 2)
convert_to_common_currency = F.udf(convert_to_common_currency, FloatType())

In [88]:
def spark_read_from_db(table_name):
    server_name = "mssql"
    port = "1433"
    database_name = "Data"
    url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"
    username = "SA"
    password = "YourStrongPassword123"
    df = spark.read \
        .format("jdbc") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password) \
        .option("encrypt", "false") \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .load()
    return df

In [89]:
properties = spark_read_from_db("ARProperties")
clients = spark_read_from_db("ARClients")
properties_clients = spark_read_from_db("PropertiesClients")
join_condition_properties = properties["id"] == properties_clients["id_prop"]
join_condition_clients = clients["id"] == properties_clients["id_client"]

joined_df = properties_clients.join(clients, join_condition_clients) \
    .join(properties, join_condition_properties)
columns_to_drop = ["id"]
joined_df = joined_df.drop(*columns_to_drop)

In [90]:
joined_df.show(10)

+-------+---------+----------------+---------+-------------+--------------------+------------------+------+---------+-----------------+----------+----------+----------+-----------+-----------+---------+--------------------+---------------+---------------+------+-----+--------+---------+-------------+---------------+--------+--------+------------+--------------------+-------------------+--------------+
|id_prop|id_client|      first_name|last_name|date_of_birth|               email|number_of_children|   sex|education|annual_income_usd|start_date|  end_date|created_on|   latitude|  longitude|  country|            province|           city|       district|estate|rooms|bedrooms|bathrooms|surface_total|surface_covered|   price|currency|price_period|               title|      property_type|operation_type|
+-------+---------+----------------+---------+-------------+--------------------+------------------+------+---------+-----------------+----------+----------+----------+-----------+----------

In [91]:
joined_df = joined_df.filter(col("operation_type").isin("Sale"))
joined_df = joined_df.filter(~col("property_type").isin("Terrain"))

In [92]:
df = joined_df.na.drop(subset=["currency", "price", "property_type"])


In [93]:
df = df.withColumn(
    "age",
    (datediff(current_date(), col("date_of_birth")) / 365.25).cast("int")
)

In [94]:
df = df.withColumn("common_currency_price", convert_to_common_currency(col("currency"), col("price")))
df = df.filter(col("common_currency_price").isNotNull())

In [95]:
df = df.withColumn("months", (col("common_currency_price"))/(col("annual_income_usd")/(12)))

In [96]:
df = df.withColumn("months", ceil(df.months))

In [97]:
df.dtypes

[('id_prop', 'bigint'),
 ('id_client', 'bigint'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('date_of_birth', 'string'),
 ('email', 'string'),
 ('number_of_children', 'bigint'),
 ('sex', 'string'),
 ('education', 'string'),
 ('annual_income_usd', 'bigint'),
 ('start_date', 'string'),
 ('end_date', 'string'),
 ('created_on', 'string'),
 ('latitude', 'double'),
 ('longitude', 'double'),
 ('country', 'string'),
 ('province', 'string'),
 ('city', 'string'),
 ('district', 'string'),
 ('estate', 'string'),
 ('rooms', 'double'),
 ('bedrooms', 'double'),
 ('bathrooms', 'double'),
 ('surface_total', 'double'),
 ('surface_covered', 'double'),
 ('price', 'double'),
 ('currency', 'string'),
 ('price_period', 'string'),
 ('title', 'string'),
 ('property_type', 'string'),
 ('operation_type', 'string'),
 ('age', 'int'),
 ('common_currency_price', 'float'),
 ('months', 'bigint')]

In [98]:
df.select("id_client", "city" , "common_currency_price", "annual_income_usd", "months", "education").show(20)

+---------+----------------+---------------------+-----------------+------+---------+
|id_client|            city|common_currency_price|annual_income_usd|months|education|
+---------+----------------+---------------------+-----------------+------+---------+
|   799982|         Rosario|              19500.0|           107000|     3|Secondary|
|   799278|          Moreno|             219000.0|            49000|    54|  Primary|
|   799882|    Mar del Tuyú|              65000.0|            43000|    19|  Primary|
|   799124|   Vicente López|             239900.0|           168000|    18| Tertiary|
|   799360|   Mar del Plata|             500000.0|           131000|    46| Tertiary|
|   799521|           Tigre|             180000.0|           100000|    22|Secondary|
|   799884|    Barrio Norte|             232500.0|            47000|    60|Secondary|
|   799281|         Rosario|              89500.0|            42000|    26|Secondary|
|   799186|        Recoleta|             225000.0|    

In [99]:
database_name = "Data_Clean"
table_name = "HowManySalaries"
server_name = "mssql"
port = "1433"
username = "SA"
password = "YourStrongPassword123"
url = f"jdbc:sqlserver://{server_name}:{port};databaseName={database_name}"
df.write \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .option("encrypt", "false") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .mode("overwrite") \
    .save()