Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load Elavon Data to BigQuery #2376

Merged
merged 46 commits into from Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b05ca05
Beginnings of Elavon SFTP data extraction
SorenSpicknall Mar 7, 2023
7868f53
Write Elavon data successfully to GCS
SorenSpicknall Mar 8, 2023
491003a
Only get files from task date
SorenSpicknall Mar 8, 2023
8bc4a3e
Inline comments
SorenSpicknall Mar 8, 2023
3408d95
Recreate work as plugin operator
SorenSpicknall Mar 9, 2023
001d459
Update argument handling
SorenSpicknall Mar 9, 2023
565246f
Align date formats
SorenSpicknall Mar 9, 2023
94913b6
Remove datestamp from consideration
SorenSpicknall Mar 10, 2023
cdb37cc
Cleanup references to source + local filename
SorenSpicknall Mar 10, 2023
cf1f217
Prototype writing raw data to bucket
SorenSpicknall Mar 10, 2023
6935825
Correctly write raw files to GCS
SorenSpicknall Mar 10, 2023
22170cf
Remove unused elavon_data_loader job files
SorenSpicknall Mar 13, 2023
df1180b
Split extract process into two jobs
SorenSpicknall Mar 13, 2023
46215c4
Get initial two-step process working
SorenSpicknall Mar 14, 2023
46cfbb9
Add to external tables
SorenSpicknall Mar 15, 2023
e8f6bda
Update dates not automatically interpreted by BQ
SorenSpicknall Mar 15, 2023
c06b263
Initial models
SorenSpicknall Mar 15, 2023
1246670
Small model fixes
SorenSpicknall Mar 16, 2023
e791b1d
Update DAG start date
SorenSpicknall Mar 16, 2023
cb76985
PR feedback round 1
SorenSpicknall Mar 16, 2023
932266e
Initial one-day lookback when missing schedule data at time of RT val…
SorenSpicknall Mar 16, 2023
2fbd455
Revert 932266e
SorenSpicknall Mar 16, 2023
a21ab0f
Make all external table fields strings
SorenSpicknall Mar 28, 2023
99f1f86
Merge branch 'main' into soren-load_elavon_data_to_bigquery
SorenSpicknall Mar 31, 2023
3abde6b
Switch to multi-DAG approach
SorenSpicknall Mar 31, 2023
bc0f82d
Move execution into PythonOperators
SorenSpicknall Mar 31, 2023
4a1825f
Switch staging table naming scheme
SorenSpicknall Mar 31, 2023
37c2305
Add uniqueness test on trn_ref_num
SorenSpicknall Mar 31, 2023
e940285
Switch to using get_fs instead of initializing using imported constru…
SorenSpicknall Apr 3, 2023
004e81b
Switch to endswith for filename string filter
SorenSpicknall Apr 3, 2023
2dac250
Add ts to raw file export path
SorenSpicknall Apr 3, 2023
5267098
Fix filepath reference
SorenSpicknall Apr 3, 2023
a85535b
Remove unused operator types
SorenSpicknall Apr 3, 2023
19c9960
Read in files more consistently
SorenSpicknall Apr 3, 2023
9f80ee5
Switch database reference in source table YAML
SorenSpicknall Apr 3, 2023
d637e0d
Update dbt for updated source behavior
SorenSpicknall Apr 3, 2023
56cd88e
Map connection information to variables
SorenSpicknall Apr 5, 2023
855b3f8
Merge branch 'main' into soren-load_elavon_data_to_bigquery
SorenSpicknall Apr 5, 2023
609c0c1
Additional comment
SorenSpicknall Apr 5, 2023
8a8ae54
Coerce strings in CSV reading
SorenSpicknall Apr 6, 2023
1be64ea
Change interpretation of ts string
SorenSpicknall Apr 6, 2023
8563ed3
Add a couple env variables
SorenSpicknall Apr 7, 2023
51e6204
Update docker-compose with new variables
SorenSpicknall Apr 7, 2023
c11411b
Review fixes
SorenSpicknall Apr 7, 2023
5774f01
Parsed file bucket rename
SorenSpicknall Apr 7, 2023
d0bfbe9
Modify source bucket for external table
SorenSpicknall Apr 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
105 changes: 105 additions & 0 deletions airflow/dags/create_external_tables/payments/elavon/transactions.yml
@@ -0,0 +1,105 @@
operator: operators.ExternalTable
bucket: "{{ env_var('CALITP_BUCKET__ELAVON_PARSED') }}"
source_objects:
- "transactions/*.jsonl.gz"
destination_project_dataset_table: "external_elavon.transactions"
source_format: NEWLINE_DELIMITED_JSON
use_bq_client: true
hive_options:
mode: CUSTOM
require_partition_filter: false
source_uri_prefix: "transactions/{dt:DATE}/{execution_ts:TIMESTAMP}/"
schema_fields:
- name: payment_reference
type: STRING
- name: payment_date
type: STRING
- name: account_number
type: STRING
- name: routing_number
type: STRING
- name: fund_amt
type: STRING
- name: batch_reference
type: STRING
- name: batch_type
type: STRING
- name: customer_batch_reference
type: STRING
- name: customer_name
type: STRING
- name: merchant_number
type: STRING
- name: external_mid
type: STRING
- name: store_number
mode: NULLABLE
type: STRING
- name: chain
type: STRING
- name: batch_amt
type: STRING
- name: amount
type: STRING
- name: surchg_amount
type: STRING
- name: convnce_amt
type: STRING
- name: card_type
type: STRING
- name: charge_type
type: STRING
- name: charge_type_description
type: STRING
- name: card_plan
type: STRING
- name: card_no
type: STRING
- name: chk_num
mode: NULLABLE
type: STRING
- name: transaction_date
type: STRING
- name: settlement_date
type: STRING
- name: authorization_code
type: STRING
- name: chargeback_control_no
mode: NULLABLE
type: STRING
- name: roc_text
mode: NULLABLE
type: STRING
- name: trn_aci
mode: NULLABLE
type: STRING
- name: card_scheme_ref
type: STRING
- name: trn_ref_num
type: STRING
- name: settlement_method
type: STRING
- name: currency_code
type: STRING
- name: cb_acq_ref_id
mode: NULLABLE
type: STRING
- name: chgbk_rsn_code
mode: NULLABLE
type: STRING
- name: chgbk_rsn_desc
mode: NULLABLE
type: STRING
- name: mer_ref
mode: NULLABLE
type: STRING
- name: purch_id
type: STRING
- name: cust_cod
type: STRING
- name: trn_arn
type: STRING
- name: term_id
type: STRING
- name: ent_num
type: STRING
16 changes: 16 additions & 0 deletions airflow/dags/parse_elavon/METADATA.yml
@@ -0,0 +1,16 @@
description: "Process raw files from Elavon into a BQ-ready JSONL form"
schedule_interval: "0 2 * * *"
tags:
- all_gusty_features
default_args:
owner: airflow
depends_on_past: False
start_date: "2023-03-22"
catchup: False
email:
- "soren.s@jarv.us"
email_on_failure: True
pool: default_pool
concurrency: 50
wait_for_defaults:
timeout: 3600
105 changes: 105 additions & 0 deletions airflow/dags/parse_elavon/elavon_to_gcs_jsonl.py
@@ -0,0 +1,105 @@
# ---
# python_callable: process_elavon_data_to_jsonl
# provide_context: true
# ---
import gzip
import os
from typing import ClassVar, List, Optional

import pandas as pd
import pendulum
from calitp_data_infra.storage import ( # type: ignore
PartitionedGCSArtifact,
get_fs,
make_name_bq_safe,
)

CALITP_BUCKET__ELAVON_RAW = os.environ["CALITP_BUCKET__ELAVON_RAW"]
CALITP_BUCKET__ELAVON_PARSED = os.environ["CALITP_BUCKET__ELAVON_PARSED"]


def fetch_and_clean_from_gcs(fs):
"""
Download raw Elavon transaction records from GCS as a DataFrame and write out
in BigQuery-ready JSONL format after cleaning
"""

all_rows = pd.DataFrame()

# List raw files available from GCS
file_and_dir_list = fs.ls(f"{CALITP_BUCKET__ELAVON_RAW}/", detail=False)
dir_list = [x for x in file_and_dir_list if fs.isdir(x)]

# Drill down to the latest export (folders are "ts=" format)
target_dir = max(dir_list)
file_list = fs.ls(f"{target_dir}/", detail=False)
SorenSpicknall marked this conversation as resolved.
Show resolved Hide resolved

file_list = [x for x in file_list if fs.isfile(x)]
for file in file_list:
print(f"Processing file {file}")

# Save each file locally to read into Pandas
if not os.path.exists("transferred_files"):
os.mkdir("transferred_files")
local_path = f"transferred_files/{file}"
fs.get(file, local_path)

if all_rows.empty:
all_rows = pd.read_csv(
local_path, delimiter="|", dtype=str
) # Read from local version
else:
all_rows = pd.concat([all_rows, pd.read_csv(local_path, delimiter="|")])

extract = ElavonExtract(
filename="transactions.jsonl.gz",
)

if all_rows.empty:
return extract

cleaned_df = all_rows.rename(make_name_bq_safe, axis="columns")
extract.data = cleaned_df

return extract


class ElavonExtract(PartitionedGCSArtifact):
bucket: ClassVar[str] = CALITP_BUCKET__ELAVON_PARSED
table: ClassVar[str] = "transactions"
execution_ts: pendulum.DateTime = pendulum.now()
dt: pendulum.Date = execution_ts.date()
partition_names: ClassVar[List[str]] = ["dt", "execution_ts"]
data: Optional[pd.DataFrame]

class Config:
arbitrary_types_allowed = True

def save_to_gcs(self, fs):
self.save_content(
fs=fs,
content=gzip.compress(
self.data.to_json(
orient="records", lines=True, default_handler=str
).encode()
),
exclude={"data"},
)


def process_elavon_data_to_jsonl(**kwargs):
fs = get_fs()
extract = fetch_and_clean_from_gcs(fs)

if extract.data is None:
print("No extracts were found in GCS")
return
if extract.data.empty:
print("All extracts found in GCS were empty")
return

extract.save_to_gcs(fs=fs)


if __name__ == "__main__":
process_elavon_data_to_jsonl()
16 changes: 16 additions & 0 deletions airflow/dags/sync_elavon/METADATA.yml
@@ -0,0 +1,16 @@
description: "Load raw files from Elavon SFTP server to GCS"
schedule_interval: "0 0 * * *"
tags:
- all_gusty_features
default_args:
owner: airflow
depends_on_past: False
start_date: "2023-03-22"
catchup: False
email:
- "soren.s@jarv.us"
email_on_failure: True
pool: default_pool
concurrency: 50
wait_for_defaults:
timeout: 3600
63 changes: 63 additions & 0 deletions airflow/dags/sync_elavon/elavon_to_gcs_raw.py
@@ -0,0 +1,63 @@
# ---
# python_callable: mirror_raw_files_from_elavon
# provide_context: true
# ---
import os

import paramiko
import pendulum
from calitp_data_infra.auth import get_secret_by_name
from calitp_data_infra.storage import get_fs

CALITP__ELAVON_SFTP_HOSTNAME = os.environ["CALITP__ELAVON_SFTP_HOSTNAME"]
CALITP__ELAVON_SFTP_PORT = os.environ["CALITP__ELAVON_SFTP_PORT"]
CALITP__ELAVON_SFTP_USERNAME = os.environ["CALITP__ELAVON_SFTP_USERNAME"]
CALITP__ELAVON_SFTP_PASSWORD = get_secret_by_name("CALITP__ELAVON_SFTP_PASSWORD")
CALITP_BUCKET__ELAVON_RAW = os.environ["CALITP_BUCKET__ELAVON_RAW"]


def mirror_raw_files_from_elavon():
"""
Download Elavon transaction records from SFTP and write raw files to GCS for
further processing in another job
"""

# Establish connection to SFTP server
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(
hostname=CALITP__ELAVON_SFTP_HOSTNAME,
port=CALITP__ELAVON_SFTP_PORT,
username=CALITP__ELAVON_SFTP_USERNAME,
password=CALITP__ELAVON_SFTP_PASSWORD,
)

# Create SFTP client and navigate to data directory
sftp_client = client.open_sftp()
sftp_client.chdir("/data")

# Initialize GCS connection
fs = get_fs()

# Initiailize extract time used by all files
ts = pendulum.now().to_iso8601_string()

for file in [x for x in sftp_client.listdir() if x.endswith(".zip")]:
print(f"Processing file {file}")

# Save to local directory for mirrored transfer to GCS
if not os.path.exists("transferred_files"):
os.mkdir("transferred_files")
local_path = f"transferred_files/{file}"
sftp_client.get(file, local_path)

# We put file by file because recursively putting the directory causes relative
# filepath issues
fs.put(
lpath=f"transferred_files/{file}",
rpath=f"{CALITP_BUCKET__ELAVON_RAW}/ts={ts}/",
)


if __name__ == "__main__":
mirror_raw_files_from_elavon()
7 changes: 7 additions & 0 deletions airflow/docker-compose.yaml
Expand Up @@ -79,6 +79,8 @@ x-airflow-common:
CALITP_BUCKET__AGGREGATOR_SCRAPER: "gs://test-calitp-aggregator-scraper"
CALITP_BUCKET__AIRTABLE: "gs://test-calitp-airtable"
CALITP_BUCKET__DBT_ARTIFACTS: "gs://test-calitp-dbt-artifacts"
CALITP_BUCKET__ELAVON_RAW: "gs://test-calitp-elavon-raw"
CALITP_BUCKET__ELAVON_PARSED: "gs://test-calitp-elavon-parsed"
CALITP_BUCKET__GTFS_DOWNLOAD_CONFIG: "gs://test-calitp-gtfs-download-config"
CALITP_BUCKET__GTFS_RT_RAW: "gs://test-calitp-gtfs-rt-raw-v2"
CALITP_BUCKET__GTFS_RT_PARSED: "gs://test-calitp-gtfs-rt-parsed"
Expand All @@ -105,6 +107,11 @@ x-airflow-common:
SENTRY_DSN: "https://e498431022154366b0ff8b71cf2d93e0@sentry.k8s.calitp.jarv.us/2"
SENTRY_ENVIRONMENT: "development"

# Connection variables for Elavon SFTP server
CALITP__ELAVON_SFTP_HOSTNAME: "34.145.56.125"
CALITP__ELAVON_SFTP_PORT: "2200"
CALITP__ELAVON_SFTP_USERNAME: "elavon"

volumes:
# Note that in cloud composer, folders like dags are not in AIRFLOW_HOME
# but nested in a folder named gcs
Expand Down
61 changes: 61 additions & 0 deletions warehouse/models/staging/payments/elavon/_elavon.yml
@@ -0,0 +1,61 @@
version: 2

sources:
- name: elavon_external_tables
description: Hive-partitioned external tables reading Elavon transactions from GCS.
database: "{{ env_var('CALITP_DBT_SOURCE_DATABASE', var('DEFAULT_SOURCE_DATABASE')) }}"
schema: external_elavon
tables:
- name: transactions
description: Transactions processed by Elavon

models:
- name: stg_elavon__transactions
description: Transactions processed by Elavon
columns:
- name: payment_reference
- name: payment_date
- name: account_number
- name: routing_number
- name: fund_amt
- name: batch_reference
- name: batch_type
- name: customer_batch_reference
- name: customer_name
- name: merchant_number
- name: external_mid
- name: store_number
- name: chain
- name: batch_amt
- name: amount
- name: surchg_amount
- name: convnce_amt
- name: card_type
- name: charge_type
- name: charge_type_description
- name: card_plan
- name: card_no
- name: chk_num
- name: transaction_date
- name: settlement_date
- name: authorization_code
- name: chargeback_control_no
- name: roc_text
- name: trn_aci
- name: card_scheme_ref
- name: trn_ref_num
tests:
- unique
- name: settlement_method
- name: currency_code
- name: cb_acq_ref_id
- name: chgbk_rsn_code
- name: chgbk_rsn_desc
- name: mer_ref
- name: purch_id
- name: cust_cod
- name: trn_arn
- name: term_id
- name: ent_num
- name: dt
- name: execution_ts