In [1]:
!pip install -r sample_data/requirements.txt

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 40 kB/s 
Collecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 53.8 MB/s 
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805911 sha256=767939c0bcba46d7d3ce82444563320fef58c6adb6d7d105d8ee0511bbed2afd
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


# Nova seção

# Nova seção

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd

path = "sample_data/Question1-Material.xlsx"
# Initializing spark session
spark = SparkSession.builder.appName("DataEngineerAssessmentQ1").getOrCreate()
# Reading the excel working sheets
q1_excel_ws1 = pd.read_excel(path, sheet_name='DBtest.dbo.DepositTransactions', header=0)
q1_excel_ws2 = pd.read_excel(path, sheet_name='DBtest.dbo.GamePlayTransactions', header=0)
# Fixing the empty field of ProviderProductName column found at the source material
q1_excel_ws2.ProviderProductName.fillna(q1_excel_ws2.mode()['ProviderProductName'][0], inplace=True)
q1_excel_ws1.CustomerEmail = q1_excel_ws1.CustomerEmail.apply(lambda e: e.replace(".om", ".com"))
# Creating pyspark dataframe of both dataframes
gameplays = spark.createDataFrame(q1_excel_ws2)
deposits = spark.createDataFrame(q1_excel_ws1)

In [3]:
depositsView = deposits.createOrReplaceTempView('depositsView')
gameplaysView = gameplays.createOrReplaceTempView('gameplaysView')

Creating the tables following the new ER diagram

In [5]:
# Creating Country Table
country = spark.sql('''
  SELECT ROW_NUMBER() OVER(
     ORDER BY cnt.CustomerCountry
  ) as CountryID, cnt.CustomerCountry as CountryName FROM
  (SELECT distinct(dv.CustomerCountry) 
   FROM depositsView dv 
  UNION
  SELECT distinct(gv.CustomerCountry)
   FROM gameplaysView gv) cnt;
''').createOrReplaceTempView("countryView")
# Creating Customer Table
customer = spark.sql('''
  SELECT ROW_NUMBER() OVER(ORDER BY cust.CustomerEmail ASC) as CustomerId 
  ,cust.*
  FROM
  (SELECT distinct(dv.CustomerEmail),
         dv.CustomerAccountStatus as CustomerStatus,
         dv.CustomerBrandName,
         cv.CountryID as CountryIdFK
         FROM depositsView dv
         LEFT JOIN countryView cv ON 
         dv.CustomerCountry = cv.CountryName
  UNION

  SELECT distinct(gv.CustomerEmail),
         gv.CustomerStatus,
         gv.CustomerBrand as CustomerBrandName,
         cv.CountryID as CountryIdFK
         FROM gameplaysView gv 
         LEFT JOIN countryView cv ON
         gv.CustomerCountry = cv.CountryName) cust
''').createOrReplaceTempView("customerView")
# Creating Payment Method Table
paymentMethod = spark.sql('''
  SELECT ROW_NUMBER() OVER(ORDER BY dv.PaymentMethodName, dv.PaymentMethodType) 
      as PaymentMethodId, 
      dv.PaymentMethodName as MethodName,
      dv.PaymentMethodType as MethodType
      FROM depositsView dv
      GROUP BY dv.PaymentMethodName, dv.PaymentMethodType 
''').createOrReplaceTempView("paymentMethodView")
# Creating Payment Status Table
paymentStatus = spark.sql('''
  SELECT ROW_NUMBER() OVER(ORDER BY dv.PaymentStatusName) as PaymentStatusId, 
         dv.PaymentStatusName as StatusName,
         dv.PaymentStatusDescription as Description
         FROM depositsView dv
         GROUP BY  dv.PaymentStatusName, dv.PaymentStatusDescription 
''').createOrReplaceTempView("paymentStatusView")
# Creating Payments Table
payments = spark.sql('''
  SELECT 
         ROW_NUMBER() OVER(ORDER BY ps.PaymentStatusId, pm.PaymentMethodId) as
         PaymentId,
         ps.PaymentStatusId as PaymentStatusIdFK,
         pm.PaymentMethodId as PaymentMethodIdFK
         FROM depositsView dv
         LEFT JOIN paymentMethodView pm ON pm.MethodName = dv.PaymentMethodName
         LEFT JOIN paymentMethodView pm2 ON pm2.MethodType = dv.PaymentMethodType
         LEFT JOIN paymentStatusView ps ON ps.StatusName = dv.PaymentStatusName
         GROUP BY ps.PaymentStatusId, pm.PaymentMethodId;
''').createOrReplaceTempView("paymentsView")
# Creating Products table
products = spark.sql('''
  SELECT 
        ROW_NUMBER() OVER(ORDER BY gv.ProviderProductName) as
        ProductId,
        gv.ProviderProductName as ProductName
        FROM gameplaysView gv
        GROUP BY gv.ProviderProductName;
''').createOrReplaceTempView("productsView")
# Creating Providers table
providers = spark.sql('''
  SELECT 
        ROW_NUMBER() OVER(ORDER BY gv.ProviderName) as ProviderId,
        gv.ProviderName,
        pr.ProductId as ProductIdFK
        FROM gameplaysView gv
        LEFT JOIN productsView pr ON gv.ProviderProductName = pr.ProductName
        GROUP BY gv.ProviderName, pr.ProductId 
''').createOrReplaceTempView("providersView")
# Creating Devices Table
devices = spark.sql('''
  SELECT 
      ROW_NUMBER() OVER(ORDER BY gv.DeviceName) DeviceId,
      gv.DeviceName
      FROM gameplaysView gv
      GROUP BY gv.DeviceName
''').createOrReplaceTempView("devicesView")
depositsTbl = spark.sql('''
  SELECT 
      dv.CalendarDate,
      ctv.CustomerId as CustomerIdFK,
      pmv.PaymentId as PaymentIdFK,
      dv.amount_eur
      FROM depositsView dv
      LEFT JOIN customerView ctv ON  dv.CustomerEmail = ctv.CustomerEmail
      LEFT JOIN paymentStatusView psv ON dv.PaymentStatusName = psv.StatusName
      LEFT JOIN paymentMethodView ptv ON dv.PaymentMethodName = ptv.MethodName
      LEFT JOIN paymentsView pmv ON (psv.PaymentStatusId = pmv.PaymentStatusIdFK
      AND ptv.PaymentMethodId = pmv.PaymentMethodIdFK)
''').createOrReplaceTempView("depositsTblView")
gameplayTranTbl = spark.sql('''
  SELECT 
      gv.CalendarDate,
      ctv.CustomerId as CustomerIdFK,
      prv.ProviderId as ProviderIdFK,
      dev.DeviceId as DeviceIdFK,
      gv.rounds,
      gv.turnover_EUR,
      gv.gameWin_EUR as gamewin_EUR,
      gv.`bonus cost` as bonus_cost,
      gv.totalAccountingRevenue_EUR
      FROM gameplaysView gv
      LEFT JOIN customerView ctv ON  gv.CustomerEmail = ctv.CustomerEmail
      LEFT JOIN providersView prv ON gv.ProviderName = prv.ProviderName
      LEFT JOIN productsView prd ON prv.ProductIdFK = prd.ProductId
      LEFT JOIN devicesView dev ON gv.DeviceName = dev.DeviceName  
''').createOrReplaceTempView("gameplaysTblView")

Which are the 10 largest completed deposit transactions? Extract the amount, customer
email, customer brand name and calendar date.

In [10]:
# spark.sql('''
#   SELECT dv.CustomerEmail,
#          dv.CustomerBrandName,
#          dv.CalendarDate,
#          dv.amount_eur FROM depositsView dv
#    ORDER BY amount_eur DESC
#    limit 10;
# ''').show()

spark.sql('''
  SELECT cust.CustomerEmail,
         cust.CustomerBrandName,
         dv.CalendarDate,
         dv.amount_eur
         FROM depositsTblView dv
         LEFT JOIN customerView cust ON  dv.CustomerIdFK = cust.CustomerId
         ORDER BY amount_eur DESC
         limit 10; 
''').show()

+--------------------+-----------------+------------+-----------+
|       CustomerEmail|CustomerBrandName|CalendarDate| amount_eur|
+--------------------+-----------------+------------+-----------+
|betsson7@testbets...|         BrandABC|    20210105| 99.9542194|
|betsson6@testbets...|         BrandGHI|    20210101| 99.3935206|
|betsson3@testbets...|         BrandABC|    20210102|98.55062404|
|betsson3@testbets...|         BrandABC|    20210105|96.54250411|
|betsson3@testbets...|         BrandABC|    20210102| 95.2215573|
|betsson@testbetss...|         BrandABC|    20210104|93.26584767|
|betsson@testbetss...|         BrandABC|    20210105|92.77897687|
|betsson@testbetss...|         BrandABC|    20210102|89.28675536|
|betsson4@testbets...|         BrandDEF|    20210102| 88.7989281|
|betsson5@testbets...|         BrandGHI|    20210102|85.76720982|
+--------------------+-----------------+------------+-----------+



What is the total number and amount of failed deposit transactions per brand? Extract
total number, amount, customer brand name, paymentstatus.

In [13]:
# spark.sql('''
#   SELECT COUNT(dv.PaymentStatusName) as number,
#          SUM(dv.amount_eur) as amount,
#          dv.CustomerBrandName
#    FROM depositsView dv
#    WHERE dv.PaymentStatusName = "Failed"
#    GROUP BY
#     dv.CustomerBrandName;
# ''').show()

spark.sql('''
  SELECT COUNT(ps.StatusName) as number,
         SUM(dv.amount_eur) as amount,
         cust.CustomerBrandName
  FROM depositsTblView dv 
  LEFT JOIN customerView cust ON  dv.CustomerIdFK = cust.CustomerId
  LEFT JOIN paymentsView pv ON dv.PaymentIdFK = pv.PaymentId
  LEFT JOIN paymentStatusView ps ON pv.PaymentStatusIdFK = ps.PaymentStatusId
  WHERE ps.StatusName = "Failed" 
  GROUP BY 
     cust.CustomerBrandName;
''').show()

+------+------------------+-----------------+
|number|            amount|CustomerBrandName|
+------+------------------+-----------------+
|     3|      179.53198553|         BrandABC|
|     1|       85.76720982|         BrandGHI|
|     3|137.54915506999998|         BrandDEF|
+------+------------------+-----------------+



How much daily turnover and accounting revenue did each brand generate per product
in the first 6 days of the year? Extract turnover, accountingrevenue, brand.

In [19]:
# spark.sql('''
#   SELECT gv.CalendarDate,
#          SUM(gv.turnover_EUR) as turnover ,
#          SUM(gv.totalAccountingRevenue_EUR) as accountingrevenue,
#          TRIM(CONCAT(gv.CustomerBrand, " - " ,gv.ProviderProductName)) as Brand_Product
#    FROM gameplaysView gv
#    WHERE gv.CalendarDate >= (SELECT MIN(hgv.CalendarDate) as min_date FROM gameplaysView hgv ) AND
#          gv.CalendarDate <= (SELECT MIN(hgv.CalendarDate) as min_date FROM gameplaysView hgv) + 5
#    GROUP BY gv.CalendarDate,
#             gv.CustomerBrand,
#             gv.ProviderProductName 
#    ORDER BY gv.CalendarDate ASC; 
# ''').show()

spark.sql('''
   SELECT gv.CalendarDate,
          SUM(gv.turnover_EUR) as turnover,
          SUM(gv.totalAccountingRevenue_EUR) as accountingrevenue,
          TRIM(CONCAT(cust.CustomerBrandName, " - " ,prd.ProductName)) as Brand_Product
   FROM gameplaysTblView gv
   LEFT JOIN customerView cust ON gv.CustomerIdFK = cust.CustomerId
   LEFT JOIN providersView prv ON gv.ProviderIdFK = prv.ProviderId
   LEFT JOIN productsView  prd ON prv.ProductIdFK = prd.ProductId
   WHERE gv.CalendarDate >= (SELECT MIN(hgv.CalendarDate) as min_date FROM gameplaysTblView hgv ) AND
         gv.CalendarDate <= (SELECT MIN(hgv.CalendarDate) as min_date FROM gameplaysTblView hgv) + 5
   GROUP BY gv.CalendarDate,
            cust.CustomerBrandName,
            prd.ProductName
   ORDER BY gv.CalendarDate ASC;
''').show()

+------------+------------------+-------------------+--------------------+
|CalendarDate|          turnover|  accountingrevenue|       Brand_Product|
+------------+------------------+-------------------+--------------------+
|    20210101|      337.28008695|-56.793251178999995|BrandABC - Games ...|
|    20210101|       60.33177977|-24.093371728999998|BrandGHI - Games ...|
|    20210101|      42.530695043|      -10.799342924|    BrandDEF - Poker|
|    20210101|     127.821704504|-18.113289606000002|    BrandABC - Poker|
|    20210101|       59.49077742|       -40.22290398|BrandDEF - Sports...|
|    20210101|111.93280089999999|      -30.732573175|BrandDEF - Games ...|
|    20210102|       77.99540158| -44.93602170700001|BrandABC - Games ...|
|    20210102|        46.2933075|       -7.546592443|BrandABC - Sports...|
|    20210102|       70.52897896|       -11.54879478|    BrandGHI - Poker|
|    20210102|      265.03056265|-112.69361952699998|    BrandABC - Poker|
|    20210102|      173.9

What is the average gamewin per product? Extract the average gamewin and product

In [23]:
# spark.sql('''
#   SELECT AVG(gv.gameWin_EUR) as gamewin,
#          gv.ProviderProductName as product
#   FROM gameplaysView gv
#   GROUP BY gv.ProviderProductName;
# ''').show()

spark.sql('''
  SELECT AVG(gv.gamewin_EUR) as gamewin,
         prd.ProductName
  FROM gameplaysTblView gv
  LEFT JOIN providersView prv ON gv.ProviderIdFK = prv.ProviderId
  LEFT JOIN productsView  prd ON prv.ProductIdFK = prd.ProductId
  GROUP BY prd.ProductName;
''').show()

+-------------------+---------------+
|            gamewin|    ProductName|
+-------------------+---------------+
|    -5.562200804625|     Sportsbook|
|-1.3086439718965515|          Poker|
| -6.951025534063831|Games of Chance|
+-------------------+---------------+

