Objective : Fetches all the CTE data individually and dumps as a csv file

How to use : 
    
    1. Give the Query in the full_query variable. 
    2. Remove any variables in the query
    3. Give all the query variable values into the python variables with same name
    4. Make the Final select statement as a CTE and leave it
    5. Enter the CTEs names in order in a list called cte_list
    6. Enter the CTEs for which you want the dumps in a list called cte_dump_list
    7. Enter the dir_name 
    

In [1]:
full_query = """
-- Open cases 

WITH 

-- Get all the cases raised in the last 6 weeks. Making sure to take the latest opportunity for the same year for the same partner.
-- Taking only first case for a given opp in a given day 

cases AS (
  SELECT c.id AS case_id  
    , c.sf_created_at AS case_created_time
    , cc.msisdn
    , DATE_TRUNC('week', c.sf_created_at)::DATE AS case_week_start
  FROM sf_cases c LEFT JOIN sf_case_cc_details cc 
    ON c.id = cc.case_cc_id
  WHERE cc.issue_v2 = 'Insurance Purchase related' 
  AND c.sf_created_at >= (DATE_TRUNC('week', CURRENT_DATE)::DATE - 42)
    AND c.sf_created_at < DATE_TRUNC('week', CURRENT_DATE)::DATE)
    ,


opportunities AS (
  SELECT o.id AS opportunity_id
    , o.createddate as opp_createddate
    , o.sfid
    , o.account__c
    , asd.msisdn
    , asd.geo_region_id
  FROM salesforce.opportunity__c o LEFT JOIN public.sf_accounts a
      ON o.account__c = a.salesforce_id 
    LEFT JOIN public.sf_account_supply_details asd
      ON a.id = asd.account_supply_id  
  WHERE o.recordtypeid = '0122x000000XxcOAAS'
  AND asd.msisdn IN (SELECT DISTINCT msisdn FROM cases)

  
 )

, opps_cases AS (
  SELECT 
  DISTINCT ON (o.opportunity_id, c.case_created_time :: DATE)
      o.opportunity_id
    , o.opp_createddate 
    , o.sfid
    , o.account__c
    , c.case_id
    , c.case_created_time
    , c.case_week_start
    , o.geo_region_id
  FROM cases c  INNER JOIN opportunities o 
    ON c.msisdn = o.msisdn
    
    WHERE (o.opp_createddate >= c.case_created_time - INTERVAL '120 days') AND ( o.opp_createddate <= c.case_created_time )
    
    ORDER BY -- For selecting only first case per day for each opportunity 
    o.opportunity_id, 
    c.case_created_time :: DATE, 
    c.case_created_time ASC 
)

, tasks AS (
  SELECT id AS task_id
    , sf_created_at AS task_created_time
    , parent_id
    , disposition
    , DATE_TRUNC('week', sf_created_at)::DATE AS task_week_start
  FROM sf_tasks
  -- All insurance related tasks
  WHERE subject = 'Vehicle Insurance Disposition Task'
  AND parent_id IN (SELECT DISTINCT sfid FROM opportunities)
    -- AND created_at >= (DATE_TRUNC('week', CURRENT_DATE)::DATE - 42)
    -- AND created_at < DATE_TRUNC('week', CURRENT_DATE)::DATE
    
    )


, opps_cases_tasks AS (
  SELECT oc.opportunity_id
    , oc.case_id
    , oc.case_created_time
    , oc.case_week_start
    , oc.opp_createddate
    , oc.geo_region_id
    --, oc.vehicle_type
    , oc.account__c
    --, oc.stage_name__c
    -- , MIN(t.task_created_time) FILTER (WHERE ((t.disposition NOT IN ('RNR', 'Not Connected')) 
    --   AND ((t.task_week_start < oc.case_week_start) OR (oc.case_week_start IS NULL)))) AS first_connected_time
    -- , MIN(t.task_created_time) FILTER (WHERE ((t.task_week_start < oc.case_week_start) OR (oc.case_week_start IS NULL))) AS first_attempted_time
    , MIN(t.task_created_time) FILTER (WHERE ((t.disposition NOT IN ('RNR', 'Not Connected')) 
      AND (t.task_week_start >= oc.case_week_start) AND (t.task_created_time >= oc.case_created_time))) AS first_connected_time_after_interest
    , MIN(t.task_created_time) FILTER (WHERE ((t.task_week_start >= oc.case_week_start) AND (t.task_created_time >= oc.case_created_time))) AS first_attempt_time_after_interest
  FROM opps_cases oc LEFT JOIN tasks t 
    ON oc.sfid = t.parent_id
  GROUP BY 1, 2, 3, 4, 5, 6, 7
)

, purchases AS (
  SELECT account__c
  , purchase_date__c
  , id AS purchase_id
  FROM salesforce.vehicle_insurance__c
  WHERE ((DATE_TRUNC('YEAR', purchase_date__c)) = (DATE_TRUNC('YEAR', createddate)))
)

, combined_data AS (
  SELECT DISTINCT ON (oct.account__c, DATE_TRUNC('year', oct.opp_createddate)) oct.* -- FOR EVERY PARNTER TAKE THE LATEST OPP
    , p.purchase_date__c
    , p.purchase_id
    , DATE_TRUNC('week', first_connected_time_after_interest)::DATE AS first_connect_after_interest_week_start
    , DATE_TRUNC('week', first_attempt_time_after_interest)::DATE AS first_attempt_after_interest_week_start
    -- If purchased insurance with us last year then Renewal, else New
    , CASE WHEN p.purchase_date__c IS NOT NULL THEN 'Renewal' ELSE 'New' END AS insurance_type
  FROM opps_cases_tasks oct LEFT JOIN purchases p 
    ON ((oct.account__c = p.account__c) AND (DATE_TRUNC('YEAR', oct.opp_createddate) = (DATE_TRUNC('YEAR', p.purchase_date__c) + INTERVAL '1 YEAR')))
  ORDER BY oct.account__c, DATE_TRUNC('year', oct.opp_createddate), oct.opp_createddate DESC
)

, geo_regions AS (
  SELECT * 
  FROM public.dblink( 
    'host=oms-prod-psql-replica.porter.in port=5432 dbname=porter_order_production user=bhanu_mittal password=aZPu95Bwuv'
    , 'SELECT id, name FROM geo_regions'
  ) AS tb (
    id INT 
    , name TEXT 
  )
)

, final_data AS (
  SELECT cd.*
    , gr.name AS geo_region_name
  FROM combined_data cd LEFT JOIN geo_regions gr
    ON cd.geo_region_id = gr.id
)

, inbound_tat AS (
  SELECT 
    case_week_start, insurance_type -- MAKE IT CASE_CREATED_WEEK_START
    -- Average of First Connect Inbound TAT  
    , EXTRACT(EPOCH FROM AVG(first_connected_time_after_interest - case_created_time) FILTER (WHERE (case_created_time IS NOT NULL))) / 86400 AS inbound_tat
  FROM final_data
  GROUP BY 1,2
)

, no_task_inbound_cases_temp AS (

  SELECT case_week_start
  --, insurance_type
  , COUNT(*) FILTER (WHERE first_attempt_time_after_interest IS NULL ) AS no_task_cases_count,
  COUNT(*) as total_cases_count
  FROM combined_data
  GROUP BY 1)
  
, no_task_inbound_cases AS (

  SELECT 
    case_week_start
    --, insurance_type
    , no_task_cases_count as open_cases_count
    , no_task_cases_count :: float/ ( total_cases_count )  *100 as Percentage
    
     FROM no_task_inbound_cases_temp

)"""


 

# Imports

In [2]:
from tqdm import tqdm

In [3]:
import re

In [4]:
import sys
helper_path ="/Users/porteruser/Documents/libraries/analytics-utils-master"
sys.path.append(helper_path)


import pandas as pd
import os
import psycopg2 as pg2
import time
import warnings
from datetime import datetime, timedelta
import db_helper.utils as db_utils

warnings.filterwarnings('ignore')

# Inputs

In [5]:
cases_from_date = pd.to_datetime('2022-01-03')
cases_to_date = pd.to_datetime('2022-01-09')

In [6]:
inputs_dict = {"{{cases_from_date}}":cases_from_date,"{{cases_to_date}}":cases_to_date }

In [7]:
# dir_name = "TAT First attempt - Time series"

dir_name = "open_cases"

In [8]:
# def create_inputs_dict(inputs_list):
    

In [9]:
inputs_list = list(set(re.findall("{.*}",full_query)))

In [10]:
def clean_full_query(full_query):
    for each_input in inputs_list:
        new_value = f"'{inputs_dict[each_input]}' :: DATE "
        full_query = full_query.replace(each_input,new_value)
        full_query = f"{full_query}"
    return full_query

In [11]:
# full_query = clean_full_query(full_query)

In [12]:
#Enter in Order
cte_list = ['cases','opportunities','opps_cases','tasks','opps_cases_tasks','purchases','combined_data','geo_regions','final_data','inbound_tat','no_task_inbound_cases_temp','no_task_inbound_cases']

#Inbound TAT First Attempt
# cte_list = ['cases','opportunities','opps_cases','tasks','opps_cases_tasks','combined_data','inbound','final_data','output']

#TAT First attempt - Time series
# cte_list = [ 'cases', 'opportunities', 'opps_cases', 'tasks', 'opps_cases_tasks', 'purchases', 'active_partners', 'combined_data', 'geo_regions', 'final_data', 'inbound_tat']




In [13]:
cte_dump_list  = ['cases','opportunities','opps_cases','opps_cases_tasks','combined_data','final_data'
                  ,'no_task_inbound_cases_temp','no_task_inbound_cases']



In [14]:
[f"{cte} AS" in full_query  for cte in cte_list ]

[True, True, True, True, True, True, True, True, True, True, True, True]

In [15]:
if sum([f"{cte} AS" in full_query  for cte in cte_list ]) != len(cte_list):
    print("Proper CTEs",end = ': ')
    print(sum([f"{cte} AS" in full_query  for cte in cte_list ]))
    
    print("Total CTEs", end = ": ")
    print(len(cte_list))
    
    print(1/0)

In [16]:
cte_location_dict= {}
for cte in cte_list:
    cte_location = re.search(f"{cte} AS", full_query)
    cte_location_dict.update({cte:cte_location})

In [17]:
cte_query_list = []
cte_query_dict = {}

In [18]:
for indx,cte in enumerate(cte_list):
    if indx != len(cte_list)-1:
        next_cte_start = cte_location_dict[cte_list[indx + 1]].start()
    else :
        next_cte_start = len(full_query)
    cur_cte_start = cte_location_dict[cte_list[indx]].start()
    
    current_cte_query = full_query[cur_cte_start:next_cte_start]
    current_cte_query = current_cte_query.strip().strip(',') #To remove trailing spaces and then , used to add new CTE
    
    cte_query_list.append(current_cte_query)
    cte_query_dict.update({cte:current_cte_query})

# Making individual CTE Queries

In [19]:
def make_full_cte_query(cumulative_cte,cte_name):
    return "WITH " + cumulative_cte + f"\n\n SELECT * FROM {cte_name}"

In [20]:
def get_cumulative_cte(indx):
    cumulative_cte = ''
    for each_cte in cte_list[:indx+1]:
        cumulative_cte = cumulative_cte + '\n, '+ cte_query_dict[each_cte]
    return cumulative_cte.strip().strip(',')

In [21]:
full_cte_query_list = []
full_cte_query_dict = {}

for indx, cte_name in enumerate(cte_list)  : 
    cumulative_cte = get_cumulative_cte(indx)
    result = make_full_cte_query(cumulative_cte ,cte_name )
    full_cte_query_list.append(result)
    full_cte_query_dict.update({cte_name:result})

# Fetching individual CTE Queries

In [22]:
fetched_cte_data = {}

In [23]:
count = 0
file_name = ''
def make_dir(dir_name):
    global count
    global file_name
    try:
        os.mkdir(dir_name)
        file_name = dir_name
    except: #CHANGE THIS TO IF ALREADY EXISTS
        count = count + 1
        dir_name = dir_name + "_"+ str(count)
        make_dir(dir_name)

In [None]:
make_dir(dir_name)

for cte in cte_dump_list:
    print(cte)
    print(pd.to_datetime(datetime.now()))
    data = db_utils.fetch_data(full_cte_query_dict[cte], connection = 'sfms')
    dump_path = os.path.join(file_name,cte) + ".csv"
    data.to_csv(dump_path,index = False)
    print(f"{cte} dumped at {dump_path}",end = "\n\n")
    fetched_cte_data.update({cte:data})
    
print("DONE")

cases
2022-01-21 22:32:43.373092
Query run time in seconds : 1.6406538486480713
cases dumped at open_cases_1_2/cases.csv

opportunities
2022-01-21 22:32:45.066275
Query run time in seconds : 379.4163599014282
opportunities dumped at open_cases_1_2/opportunities.csv

opps_cases
2022-01-21 22:39:04.525823
