In [1]:
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from sqlalchemy import engine
import pandas as pd

import sys
sys.path.append('../PyScripts/')
import falcon_env

In [2]:
conn = engine.create_engine(falcon_env.get_url())

In [3]:
orgs = conn.execute('SELECT id FROM ci_api_organization LIMIT 5').fetchall()
orgs

[(UUID('384e6be7-05c9-4e0c-9aba-1ed4c55d1141'),),
 (UUID('fff4e6af-dbbb-47f5-a5ab-4748bf1732f9'),),
 (UUID('48ec9852-a07f-47d5-8421-ee4a55ed547a'),),
 (UUID('83072de8-c556-4fc6-82ab-f738d716f3e5'),),
 (UUID('2a47ef58-cedd-47cf-9539-2795dcdb9dd4'),)]

In [4]:
for i in orgs:
    print(i[0])

384e6be7-05c9-4e0c-9aba-1ed4c55d1141
fff4e6af-dbbb-47f5-a5ab-4748bf1732f9
48ec9852-a07f-47d5-8421-ee4a55ed547a
83072de8-c556-4fc6-82ab-f738d716f3e5
2a47ef58-cedd-47cf-9539-2795dcdb9dd4


In [31]:
archive_orgs = conn.execute('''SELECT id from ci_api_organization
    WHERE  disabled
      AND  archive_status is NULL''').fetchall()
[str(i[0]) for i in archive_orgs]

['384e6be7-05c9-4e0c-9aba-1ed4c55d1141',
 'fff4e6af-dbbb-47f5-a5ab-4748bf1732f9',
 '48ec9852-a07f-47d5-8421-ee4a55ed547a',
 '83072de8-c556-4fc6-82ab-f738d716f3e5',
 '2a47ef58-cedd-47cf-9539-2795dcdb9dd4',
 '63726a09-a480-4643-9f80-7753519411ad',
 '16e62cf9-9f49-4bf7-a97d-ddeaf4f9ab4a',
 '8461faa3-eba3-4c43-858f-131c84cff0a7',
 '7f5b008b-0482-4513-88a4-b3a1c774a674',
 '6ce77abe-90cb-4141-abae-bbf54cb76ed9',
 '62899d06-8f2b-4ee0-912e-8b458decdd13',
 '0d5819fe-aaab-4815-8ab8-270984a77ce4',
 '544beffa-02d6-4e94-9c01-998cc54dd477',
 'e2b636ea-b152-47f7-b4a3-0660c631aa42',
 '47e998fb-42ba-49ab-9e45-d5a8d1e0994e',
 '056fc9c5-0c5b-4f38-95c3-8d732f9321fe',
 '7fa225eb-8848-4849-8d63-c51339d7b491',
 'dd5178ae-6067-42a5-9cd3-3623eaab1398',
 '4cdc9c6b-852a-4a43-a2e7-e2ebcfa4f274',
 'ff36fedc-62b6-4909-b251-bf88cdd5e5d4',
 '9acf1aa8-1fc8-4321-ba35-70d42369282f',
 '8d586e14-5940-462e-8a0b-c1ccccaee9bc',
 '199d3807-030f-4188-98a5-0ee5ff9a9e25',
 'efa4ae58-a836-40c8-8ec8-52616cc137e2',
 '8f2a35bd-2cc5-

In [14]:
fields = conn.execute(
'''
-- SELECT ARRAY_AGG(f)
-- FROM (
   SELECT DISTINCT jsonb_object_keys(custom_data) f
   FROM ci_api_subscriber
   WHERE org_id='03372cc1-28c7-4213-80d2-32880d22ba36'
-- ) u;

'''
).fetchall()

In [15]:
custom_fields = [i[0] for i in fields]
print(''.join([f"custom_data ->> '{i}' as {i}, \n" for i in custom_fields]))

custom_data ->> 'address' as address, 
custom_data ->> 'age' as age, 
custom_data ->> 'age_group' as age_group, 
custom_data ->> 'avg_amt' as avg_amt, 
custom_data ->> 'cd' as cd, 
custom_data ->> 'ci_segment' as ci_segment, 
custom_data ->> 'city' as city, 
custom_data ->> 'county' as county, 
custom_data ->> 'dob' as dob, 
custom_data ->> 'file_source' as file_source, 
custom_data ->> 'first_name' as first_name, 
custom_data ->> 'first_name_sl' as first_name_sl, 
custom_data ->> 'gender' as gender, 
custom_data ->> 'hd' as hd, 
custom_data ->> 'import_tags' as import_tags, 
custom_data ->> 'last_name' as last_name, 
custom_data ->> 'mrc_amt' as mrc_amt, 
custom_data ->> 'mrc_date' as mrc_date, 
custom_data ->> 'original_source' as original_source, 
custom_data ->> 'phone_number' as phone_number, 
custom_data ->> 'rental_id' as rental_id, 
custom_data ->> 'sd' as sd, 
custom_data ->> 'state' as state, 
custom_data ->> 'total_contributions' as total_contributions, 
custom_data ->> 'war

In [21]:
def get_custom_fields(org_id):
    conn = engine.create_engine(falcon_env.get_url())
    fields = conn.execute(
        f'''
           SELECT DISTINCT jsonb_object_keys(custom_data)
           FROM ci_api_subscriber
           WHERE org_id='{org_id}';
        '''
    ).fetchall()
    custom_fields = [i[0] for i in fields]
    return custom_fields

def create_query(org_id):
    custom_fields = get_custom_fields(org_id)
    part_1 = 'SELECT email, \n'
    part_2 = ''
    for i in custom_fields:
        part_2 += f'''custom_data ->> '{i}' as "{i}", \n'''
    part_3 = f"last_open, \nlast_click,\nlast_email \nFROM ci_api_subscriber\nWHERE org_id = '{org_id}';"
    return(part_1+part_2+part_3)

In [22]:
query = create_query('8f2a35bd-2cc5-4dc5-8541-03c3b3331dc8')

In [23]:
print(query)

SELECT email, 
custom_data ->> '2020_un-seg' as "2020_un-seg", 
custom_data ->> 'addresd' as "addresd", 
custom_data ->> 'address' as "address", 
custom_data ->> 'address_1' as "address_1", 
custom_data ->> 'address_2' as "address_2", 
custom_data ->> 'age' as "age", 
custom_data ->> 'attend_church_often' as "attend_church_often", 
custom_data ->> 'awr_seg' as "awr_seg", 
custom_data ->> 'cd' as "cd", 
custom_data ->> 'church_donor' as "church_donor", 
custom_data ->> 'ci_segment' as "ci_segment", 
custom_data ->> 'city' as "city", 
custom_data ->> 'contact_id' as "contact_id", 
custom_data ->> 'county' as "county", 
custom_data ->> 'cruz_favorability' as "cruz_favorability", 
custom_data ->> 'data_source' as "data_source", 
custom_data ->> 'desantis_favorable' as "desantis_favorable", 
custom_data ->> 'desantis_support_head_to_head' as "desantis_support_head_to_head", 
custom_data ->> 'desantis_support_three_way' as "desantis_support_three_way", 
custom_data ->> 'dob_year' as "dob_yea

In [25]:
df = pd.read_sql(query, con = conn)
df.to_csv(
    '../csv/8f2a35bd-2cc5-4dc5-8541-03c3b3331dc8_20200923.csv.gz',
    index=False,
    compression = "gzip")

KeyboardInterrupt: 

In [28]:
df.to_csv(
    '../csv/8f2a35bd-2cc5-4dc5-8541-03c3b3331dc8_20200923.csv.gz',
    index=False,
    compression = "gzip")

In [29]:
df.shape

(6134396, 87)

In [18]:
def fetch_subscriber_data(org_id, custom_data, connection):
    print(f'{org_id}: Fetching subscriber data...')
    custom_selects = ''.join([f"custom_data ->> '{i}' as {i}, \n        " for i in custom_data])

    query = f'''
    SELECT 
        email,
        {custom_selects}
        last_open,
        last_delivered,
        last_email
    FROM ci_api_subscriber
    WHERE org_id = '{org_id}'
    '''
    print(query)

In [19]:
fetch_subscriber_data('03372cc1-28c7-4213-80d2-32880d22ba36', custom_fields, 'con')

03372cc1-28c7-4213-80d2-32880d22ba36: Fetching subscriber data...

    SELECT 
        email,
        custom_data ->> 'address' as address, 
        custom_data ->> 'age' as age, 
        custom_data ->> 'age_group' as age_group, 
        custom_data ->> 'avg_amt' as avg_amt, 
        custom_data ->> 'cd' as cd, 
        custom_data ->> 'ci_segment' as ci_segment, 
        custom_data ->> 'city' as city, 
        custom_data ->> 'county' as county, 
        custom_data ->> 'dob' as dob, 
        custom_data ->> 'file_source' as file_source, 
        custom_data ->> 'first_name' as first_name, 
        custom_data ->> 'first_name_sl' as first_name_sl, 
        custom_data ->> 'gender' as gender, 
        custom_data ->> 'hd' as hd, 
        custom_data ->> 'import_tags' as import_tags, 
        custom_data ->> 'last_name' as last_name, 
        custom_data ->> 'mrc_amt' as mrc_amt, 
        custom_data ->> 'mrc_date' as mrc_date, 
        custom_data ->> 'original_source' as original_so