In [93]:
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import json
from pyspark.sql.functions import col

In [62]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("Pyspark Execcises") \
                    .getOrCreate()

sc = spark.sparkContext

In [63]:
data_path = "data/german.data"

In [64]:
# if you don't have wget (on a win machine, for example) just download the data manually
# and put into $data_path
def download_data_if_needed(data_path):
    DATASET_URL = "https://archive.ics.uci.edu/ml/machine-learning-databases/statlog/german/german.data"
    if not os.path.isfile(data_path):
        import wget
        wget.download(DATASET_URL, "data/")
    else:
        print('data is already in place')

In [65]:
download_data_if_needed(data_path)

data is already in place


In [100]:
def show(df):
    return df.limit(7).toPandas().head() # читерим в целях демонстрации

In [67]:
sch = [('existingchecking', StringType()),  #1
       ('duration',IntegerType()), # 2
       ('credithistory',StringType()), # 3
       ('purpose',StringType()), # 4
       ('creditamount',IntegerType()), # 5
       ('savings',StringType()), # 6
       ('employmentsince',StringType()),  # 7
       ('installmentrate',IntegerType()), # 8
       ('statussex',StringType()),  # 9
       ('otherdebtors',StringType()),  # 10
       ('residencesince',IntegerType()), # 11
       ('property',StringType()),  # 12
       ('age',IntegerType()), # 13
       ('otherinstallmentplans',StringType()),  # 14
       ('housing',StringType()),  # 15
       ('existingcredits',IntegerType()), # 16
       ('job',StringType()),  # 17
       ('peopleliable',IntegerType()), # 18
       ('telephone',StringType()),  # 19
       ('foreignworker',StringType()),  # 20
       ('classification', IntegerType())]

# wrapper 
schema = StructType([StructField(name, column_type, True) for name, column_type in sch])


# TASK 1: read data

df = spark.read \
          .option("delimiter", " ") \
          .csv(data_path, schema = schema)

show(df)

Unnamed: 0,existingchecking,duration,credithistory,purpose,creditamount,savings,employmentsince,installmentrate,statussex,otherdebtors,...,property,age,otherinstallmentplans,housing,existingcredits,job,peopleliable,telephone,foreignworker,classification
0,A11,6,A34,A43,1169,A65,A75,4,A93,A101,...,A121,67,A143,A152,2,A173,1,A192,A201,1
1,A12,48,A32,A43,5951,A61,A73,2,A92,A101,...,A121,22,A143,A152,1,A173,1,A191,A201,2
2,A14,12,A34,A46,2096,A61,A74,2,A93,A101,...,A121,49,A143,A152,1,A172,2,A191,A201,1
3,A11,42,A32,A42,7882,A61,A74,2,A93,A103,...,A122,45,A143,A153,1,A173,2,A191,A201,1
4,A11,24,A33,A40,4870,A61,A73,3,A93,A101,...,A124,53,A143,A153,2,A173,2,A191,A201,2


In [68]:
with open("data/german_mapping.json", "r") as mapping_file:
    mapping = json.load(mapping_file)

In [138]:
# TASK 2: remap values

df = df.replace(to_replace=mapping)

In [141]:
#df.show()# -- плохо печатает не в терминале

In [71]:
show(df)

Unnamed: 0,existingchecking,duration,credithistory,purpose,creditamount,savings,employmentsince,installmentrate,statussex,otherdebtors,...,property,age,otherinstallmentplans,housing,existingcredits,job,peopleliable,telephone,foreignworker,classification
0,... < 0 DM,6,critical account/other credits existing (not a...,radio/television,1169,unknown/ no savings account,>= 7 years,4,male - single,none,...,real estate,67,none,own,2,skilled employee/official,1,"yes, registered under the customers name",local worker,1
1,0 <= ... < 200 DM,48,existing credits paid back duly till now,radio/television,5951,... < 100 DM,1 <= < 4 years,2,female - divorced/separated/married,none,...,real estate,22,none,own,1,skilled employee/official,1,none,local worker,2
2,no checking account,12,critical account/other credits existing (not a...,education,2096,... < 100 DM,4 <= < 7 years,2,male - single,none,...,real estate,49,none,own,1,unskilled - resident,2,none,local worker,1
3,... < 0 DM,42,existing credits paid back duly till now,furniture/equipment,7882,... < 100 DM,4 <= < 7 years,2,male - single,guarantor,...,if not A121 : building society savings agreeme...,45,none,for free,1,skilled employee/official,2,none,local worker,1
4,... < 0 DM,24,delay in paying off in the past,car (new),4870,... < 100 DM,1 <= < 4 years,3,male - single,none,...,unknown / no property,53,none,for free,2,skilled employee/official,2,none,local worker,2


In [72]:
# TASK 3:
# создать новую колонку duration_category
# 0-12 short
# 12-24 medium
# 24+ long

# Answer:
from pyspark.sql.functions import col, when

df = df.withColumn("categorical_duration", when(col("duration") <= 12, "short") \
                                          .when((12 < col("duration")) & (col("duration") <= 24), "medium") \
                                          .otherwise("long"))
show(df)

Unnamed: 0,existingchecking,duration,credithistory,purpose,creditamount,savings,employmentsince,installmentrate,statussex,otherdebtors,...,age,otherinstallmentplans,housing,existingcredits,job,peopleliable,telephone,foreignworker,classification,categorical_duration
0,... < 0 DM,6,critical account/other credits existing (not a...,radio/television,1169,unknown/ no savings account,>= 7 years,4,male - single,none,...,67,none,own,2,skilled employee/official,1,"yes, registered under the customers name",local worker,1,short
1,0 <= ... < 200 DM,48,existing credits paid back duly till now,radio/television,5951,... < 100 DM,1 <= < 4 years,2,female - divorced/separated/married,none,...,22,none,own,1,skilled employee/official,1,none,local worker,2,long
2,no checking account,12,critical account/other credits existing (not a...,education,2096,... < 100 DM,4 <= < 7 years,2,male - single,none,...,49,none,own,1,unskilled - resident,2,none,local worker,1,short
3,... < 0 DM,42,existing credits paid back duly till now,furniture/equipment,7882,... < 100 DM,4 <= < 7 years,2,male - single,guarantor,...,45,none,for free,1,skilled employee/official,2,none,local worker,1,long
4,... < 0 DM,24,delay in paying off in the past,car (new),4870,... < 100 DM,1 <= < 4 years,3,male - single,none,...,53,none,for free,2,skilled employee/official,2,none,local worker,2,medium


In [73]:
# TASK 4:
# выполнить SQL запрос, который работает по след логике
# categorical_duration = 'short' или categorical_duration = 'medium'

# Answer:
df.createOrReplaceTempView("DF")

spark.sql("""SELECT *
             FROM DF
             WHERE categorical_duration = 'short' or categorical_duration = 'medium'
          """)
show(df)

Unnamed: 0,existingchecking,duration,credithistory,purpose,creditamount,savings,employmentsince,installmentrate,statussex,otherdebtors,...,age,otherinstallmentplans,housing,existingcredits,job,peopleliable,telephone,foreignworker,classification,categorical_duration
0,... < 0 DM,6,critical account/other credits existing (not a...,radio/television,1169,unknown/ no savings account,>= 7 years,4,male - single,none,...,67,none,own,2,skilled employee/official,1,"yes, registered under the customers name",local worker,1,short
1,0 <= ... < 200 DM,48,existing credits paid back duly till now,radio/television,5951,... < 100 DM,1 <= < 4 years,2,female - divorced/separated/married,none,...,22,none,own,1,skilled employee/official,1,none,local worker,2,long
2,no checking account,12,critical account/other credits existing (not a...,education,2096,... < 100 DM,4 <= < 7 years,2,male - single,none,...,49,none,own,1,unskilled - resident,2,none,local worker,1,short
3,... < 0 DM,42,existing credits paid back duly till now,furniture/equipment,7882,... < 100 DM,4 <= < 7 years,2,male - single,guarantor,...,45,none,for free,1,skilled employee/official,2,none,local worker,1,long
4,... < 0 DM,24,delay in paying off in the past,car (new),4870,... < 100 DM,1 <= < 4 years,3,male - single,none,...,53,none,for free,2,skilled employee/official,2,none,local worker,2,medium


# Продолжаем работать с запросами

In [78]:
empl = spark.read \
          .option("delimiter", ",") \
          .load("queries_data/employee.csv",
                 format='com.databricks.spark.csv', 
                 header='true', 
                 inferSchema='true')

show(empl)

Unnamed: 0,ssn,fname,lname,sex,salary,super_ssn,dno
0,1,peter,skachovsky,m,1000,2.0,10
1,2,ronald,dehaan,m,2000,4.0,10
2,3,alina,petrova,f,1500,2.0,10
3,4,sergey,paramonov,m,4000,,20
4,5,andra,busoniu,f,2500,,30


In [86]:
works_on = spark.read \
          .option("delimiter", ",") \
          .load("queries_data/works_on.csv",
                 format='com.databricks.spark.csv', 
                 header='true', 
                 inferSchema='true')
show(works_on)

Unnamed: 0,essn,pno,hours
0,1,1,10
1,1,2,20
2,1,3,5
3,1,4,10
4,1,5,3


In [87]:
project = spark.read \
          .option("delimiter", ",") \
          .load("queries_data/project.csv",
                 format='com.databricks.spark.csv', 
                 header='true', 
                 inferSchema='true')
show(project)

Unnamed: 0,pname,pnumber,plocation,dnum
0,A,1,Dresden,10
1,A,2,Bolzano,10
2,C,3,Moscow,20
3,X,4,Vienna,30
4,Z,5,Houston,20


## Question 1
Question: Retrieve the names of all employees in department 10 who work more than 10 hours per week on the ProductX project!

SQL code to guide:
``` SQL
SELECT Fname, Lname
FROM EMPLOYEE, WORKS_ON, PROJECT 
WHERE Ssn=Essn
AND Pno=Pnumber
AND Dno=10
AND Hours>=10
AND Pname=’X’
```

In [113]:
# filters
emp_dno10 = empl.filter("dno == 10")
work_more10 = works_on.filter("hours >= 10")
projectX  = project.filter("pname == 'X'")

# join
combined = emp_dno10.join(work_more10, col("ssn") == col("essn")).join(projectX, col("pno") == col("pnumber"))

# project on names and take unique
output = combined.select("fname","lname").distinct()

#output 
show(output)

Unnamed: 0,fname,lname
0,peter,skachovsky


## Question 2
Question: List the names of all employees who have a
dependent with the same first name as themselves

SQL code to guide:
``` SQL
SELECT Fname, Lname
FROM EMPLOYEE, DEPENDENT
WHERE Ssn=Essn
AND Dependent_name=Fname
```

In [118]:
dependent = spark.read \
          .option("delimiter", ",") \
          .load("queries_data/dependent.csv",
                 format='com.databricks.spark.csv', 
                 header='true', 
                 inferSchema='true')

output = empl.join(dependent, (col("essn") == col("ssn")) & (col("fname") == col("dependent_name")))
show(output)

Unnamed: 0,ssn,fname,lname,sex,salary,super_ssn,dno,essn,dependent_name,sex.1
0,2,ronald,dehaan,m,2000,4,10,2,ronald,m


## Question  3
Question: For each project, list the project name and the total hours per week (by all employees) spent on that project

SQL code to guide:
``` SQL
SELECT Pno, Pname, SUM(Hours)
FROM PROJECT JOIN WORKS_ON ON Pno=Pnumber 
GROUP BY Pno, Pname
```

In [129]:
combined = project.join(works_on, col("pno") == col("pnumber"))
aggregated = combined.groupby("pno").agg({"hours": "sum" }).withColumnRenamed("sum(hours)","hours_total")
output = aggregated
show(output)

Unnamed: 0,pno,hours_total
0,1,20
1,3,5
2,5,13
3,4,50
4,2,30


## Question  4

Question: List the last names of all department managers who have no dependents

SQL code to guide:
``` SQL
SELECT Fname, Lname
FROM EMPLOYEE, DEPARTMENT 
WHERE Ssn=Mgr_ssn
AND NOT EXISTS (SELECT *
                FROM DEPENDENT 
                WHERE Essn = Ssn)
```

In [137]:
#Left Anti Join
# This join is like df1-df2, as it selects all rows from df1 that are not present in df2.   
# df  = df1.join(df2, on=['key'], how='left_anti')

department = spark.read \
          .option("delimiter", ",") \
          .load("queries_data/department.csv",
                 format='com.databricks.spark.csv', 
                 header='true', 
                 inferSchema='true')

combined = empl.join(department, col("ssn") == col("mgr_ssn") )
output = combined.join(dependent, col("ssn") == col("essn"), "left_anti").select("fname","lname")
show(output)

Unnamed: 0,fname,lname
0,sergey,paramonov
1,andra,busoniu
