In [16]:
from datetime import datetime, timedelta, timezone
import requests
import json

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import Row
from datetime import date
from pyspark.sql.functions import lit
import pyspark

In [17]:
spark = SparkSession.builder \
    .appName("Fetch_PGSoft_to_HDFS") \
    .master("local") \
    .enableHiveSupport() \
    .getOrCreate()

In [18]:
PGSOFT_OLD_VERSION_TABLE ='pgsoft_old_version'

secret_key = ""
operator_token = ""
pg_history_url = ""

history_api = '/v2/Bet/GetHistory'

# url = f"{pg_history_url}{history_api}" 

url = "http://localhost:8800/pg_soft" # MockAPI

### This can be executed in AIRFLOW then pass it via xcom

In [19]:
from airflow.providers.postgres.hooks.postgres import PostgresHook

def get_pgversion():
    conn_collector_pg_hook = PostgresHook(postgres_conn_id='collector_conn_id')
    query = """
        SELECT row_version FROM {0} LIMIT 1
    """.format(PGSOFT_OLD_VERSION_TABLE)

    df = conn_collector_pg_hook.get_pandas_df(query)
    if not df.empty:
        latest_row_version = df['row_version'].iloc[0]
        return latest_row_version
    else:
        return None

In [20]:
# def get_some_value(**kwargs):
#     some_value = 10
#     return some_value

# task1 = PythonOperator(task_id='run_task_1',
#                        python_callable=get_some_value,
#                        provide_context=True,
#                        dag=dag)

# task2 = SparkSubmitOperator(
#     task_id='run_sparkSubmit_job',
#     conn_id='spark_default',
#     java_class='com.example',
#     application='example.jar',
#     name='airflow-spark-job',
#     verbose=True,
#     application_args=["{{ti.xcom_pull(task_ids='get_pgsoft_row_version')}}"],  
#     conf={'master':'yarn'},
#     dag=dag,
# )

# Grab the row version

In [21]:
import sys
latest_row_version = sys.argv[1]

In [22]:
# try:
form_data = {
    "secret_key":     secret_key,
    "operator_token": operator_token,
    "bet_type":        "1",
    "row_version":  latest_row_version,
    "count":          "5000"
}

print(f"Start download pg: row_version {latest_row_version}")
response = requests.post(url, data=form_data)
response.raise_for_status() 
print(f"response contains {len(response.json())} rows")

json_data = response.json()
df = spark.createDataFrame(json_data) 

# options = { 'url' : url, 'method' : 'GET', 'readTimeout' : '10000', 'connectionTimeout' : '2000', 'partitions' : '10'}

# df = spark.read.format("org.apache.dsext.spark.datasource.rest.RestDataSource").options(**options).load()

# if response.status_code == 404:
#     print("Error 404: Not Found")
# else:
#     json_content = response.json()
#     print(json_content)

# except requests.exceptions.RequestException as err:
# print("Request error:", err)

Start download pg: row_version --ip=127.0.0.1
response contains 5006 rows


In [23]:
df.printSchema()

root
 |-- balanceAfter: long (nullable = true)
 |-- balanceBefore: long (nullable = true)
 |-- betAmount: long (nullable = true)
 |-- betId: long (nullable = true)
 |-- betTime: string (nullable = true)
 |-- betType: long (nullable = true)
 |-- currency: string (nullable = true)
 |-- gameId: long (nullable = true)
 |-- id: long (nullable = true)
 |-- jackpotRtpContributionAmount: long (nullable = true)
 |-- jackpotWinAmount: long (nullable = true)
 |-- parentBetId: long (nullable = true)
 |-- platform: long (nullable = true)
 |-- playerName: string (nullable = true)
 |-- rowVersion: long (nullable = true)
 |-- transactionType: long (nullable = true)
 |-- winAmount: long (nullable = true)



In [24]:
from pyspark.sql.functions import to_timestamp, year, quarter, date_format, col

In [25]:
df.describe().show()

23/08/11 11:43:06 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 0:>                                                          (0 + 1) / 1]

+-------+-----------------+------------------+------------------+------------------+--------------------+-----------------+--------+-----------------+------------------+----------------------------+-----------------+------------------+------------------+----------+-----------------+-----------------+------------------+
|summary|     balanceAfter|     balanceBefore|         betAmount|             betId|             betTime|          betType|currency|           gameId|                id|jackpotRtpContributionAmount| jackpotWinAmount|       parentBetId|          platform|playerName|       rowVersion|  transactionType|         winAmount|
+-------+-----------------+------------------+------------------+------------------+--------------------+-----------------+--------+-----------------+------------------+----------------------------+-----------------+------------------+------------------+----------+-----------------+-----------------+------------------+
|  count|             5003|          

                                                                                

In [26]:
out_df = df \
    .withColumn("betTime",to_timestamp(df["betTime"])) \
    .withColumn("year", date_format(col("betTime"), "yyyy")) \
    .withColumn("quarter", date_format(col("betTime"), "Q")) 

In [27]:
out_df.describe().show()

+-------+-----------------+------------------+------------------+------------------+-----------------+--------+-----------------+------------------+----------------------------+-----------------+------------------+------------------+----------+-----------------+-----------------+------------------+------------------+-----------------+
|summary|     balanceAfter|     balanceBefore|         betAmount|             betId|          betType|currency|           gameId|                id|jackpotRtpContributionAmount| jackpotWinAmount|       parentBetId|          platform|playerName|       rowVersion|  transactionType|         winAmount|              year|          quarter|
+-------+-----------------+------------------+------------------+------------------+-----------------+--------+-----------------+------------------+----------------------------+-----------------+------------------+------------------+----------+-----------------+-----------------+------------------+------------------+--------

In [28]:
out_df[out_df['gameId'] == 123 ].describe().show()

+-------+------------------+------------------+------------------+-----------------+------------------+--------+------+-----------------+----------------------------+-----------------+------------------+------------------+----------+-----------------+------------------+------------------+------------------+------------------+
|summary|      balanceAfter|     balanceBefore|         betAmount|            betId|           betType|currency|gameId|               id|jackpotRtpContributionAmount| jackpotWinAmount|       parentBetId|          platform|playerName|       rowVersion|   transactionType|         winAmount|              year|           quarter|
+-------+------------------+------------------+------------------+-----------------+------------------+--------+------+-----------------+----------------------------+-----------------+------------------+------------------+----------+-----------------+------------------+------------------+------------------+------------------+
|  count|       

In [29]:
out_df.printSchema()

root
 |-- balanceAfter: long (nullable = true)
 |-- balanceBefore: long (nullable = true)
 |-- betAmount: long (nullable = true)
 |-- betId: long (nullable = true)
 |-- betTime: timestamp (nullable = true)
 |-- betType: long (nullable = true)
 |-- currency: string (nullable = true)
 |-- gameId: long (nullable = true)
 |-- id: long (nullable = true)
 |-- jackpotRtpContributionAmount: long (nullable = true)
 |-- jackpotWinAmount: long (nullable = true)
 |-- parentBetId: long (nullable = true)
 |-- platform: long (nullable = true)
 |-- playerName: string (nullable = true)
 |-- rowVersion: long (nullable = true)
 |-- transactionType: long (nullable = true)
 |-- winAmount: long (nullable = true)
 |-- year: string (nullable = true)
 |-- quarter: string (nullable = true)



In [30]:
tblLocation = './user/hive/datalake/wagers/pgsoft'
out_df.write.partitionBy('year', 'quarter').mode('append').parquet(tblLocation)

                                                                                

In [31]:
# tblLocation = './user/hive/datalake/wagers/pgsoft2'
# out_df.write.partitionBy('year_quarter').mode('append').parquet(tblLocation)

In [32]:
# tblLocation = './user/hive/datalake/wagers/pgsoft3'
# out_df.write.partitionBy('year', 'month', 'day').mode('append').parquet(tblLocation)