In [0]:
import importlib
import pandas as pd
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, DoubleType
import time
from datetime import datetime

In [0]:
%pip install pysurveycto
import pysurveycto as pcto

## Connect to Survey CTO

In [0]:
dbutils.widgets.text("SURVEY_CTO_SERVER", "", "SURVEY_CTO_SERVER")
dbutils.widgets.text("SURVEY_CTO_USERNAME", "", "SURVEY_CTO_USERNAME")
dbutils.widgets.text("SURVEY_CTO_PASSWORD", "", "SURVEY_CTO_PASSWORD")

server_name = dbutils.widgets.get("SURVEY_CTO_SERVER")
username = dbutils.widgets.get("SURVEY_CTO_USERNAME")
password = dbutils.widgets.get("SURVEY_CTO_PASSWORD")

scto = pcto.SurveyCTOObject(server_name, username, password)

## Get all Form Names

In [0]:
all_forms = scto.list_forms()
all_ids = [form['id'] for form in all_forms]
time.sleep(301)

## Pull & Write All Survey CTO Data

In [0]:
# Write to prg_mega
def write_to_catalog(TABLE, df):
    CATALOG = "prd_mega"
    SCHEMA = "ssurve38"

    sdf = spark.createDataFrame(df)
    spark.sql(f"USE {CATALOG}.{SCHEMA}")
    sdf.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(TABLE)

    return

In [0]:
all_ids

In [0]:
# good_ids = ['dime_stc_application_mar2024'] # testing using a smaller subset of form id's 
good_ids = []

if good_ids == []:
    good_ids = all_ids

num_pass = 0
num_fail = 0

for form_id in good_ids:
    try:
        last_extraction_date = datetime(2004, 1, 1)  
        date_param = last_extraction_date.isoformat()
        form_json = scto.get_form_data(form_id,format='json', oldest_completion_date=last_extraction_date)
        # time.sleep(301)
        form_df = pd.DataFrame(form_json)
        try:
            write_to_catalog(form_id, form_df)
            print(form_id, ': PASS')
            num_pass += 1
        except:
            num_fail += 1
            print(form_id, ': FAIL (DB Upload)')
    except:
        num_fail += 1

print('num_passed: ',num_pass)
print('num_failed: ',num_fail)