In [None]:
import requests
import pandas as pd
import json
import config
import msal
from datetime import datetime, timedelta
from dateutil import parser

In [153]:
def powerBIPostDataset(group_id, data_types, token):
    # Create the URL endpoint for the REST API call.
    powerBIApiUrl = "https://api.powerbi.com/v1.0/myorg/groups/" + group_id + "/datasets"
    headers = {'Authorization': 'Bearer ' + token,
               'Content-Type': 'application/json'}
    # Post the data to the Power BI API endpoint
    response = requests.post(powerBIApiUrl, headers=headers, json=data_types)
    return response

def powerBIDeleteDataset(group_id, dataset_id, token):
    powerBIApiURl = f"https://api.powerbi.com/v1.0/myorg/groups/{group_id}/datasets/{dataset_id}"
    headers = {'Authorization': 'Bearer ' + token,
                'Content-Type': 'application/json'}
    response = requests.delete(powerBIApiURl, headers=headers)
    return response

def powerBIGetDatasetID(group_id, token):
    powerBIApiUrl = "https://api.powerbi.com/v1.0/myorg/groups/" + group_id + "/datasets"
    headers = {'Authorization': 'Bearer ' + token,
               'Content-Type': 'application/json'}
    response = requests.get(powerBIApiUrl, headers=headers)
    response_json = response.json()
    dataset_id = response_json['value'][0]['id']
    dataset_name = response_json['value'][0]['name']
    return dataset_id, dataset_name

def powerBIPutTable(group_id, dataset_id, put_table_name, put_request, token):
    # URL encode the table name
    put_table_name = put_table_name.replace(" ", "%20")
    # Construct the URL
    powerBIAPIUrl = f"https://api.powerbi.com/v1.0/myorg/groups/{group_id}/datasets/{dataset_id}/tables/{put_table_name}"
    headers = {
        'Authorization': 'Bearer ' + token,
        'Content-Type': 'application/json'
    }
    response = requests.put(powerBIAPIUrl, headers=headers, json=put_request)
    return response
    
def powerBIPostRows(group_id, dataset_id, table_name, rows_request, token):
    table_name = table_name.replace(" ", "%20")
    powerBIApiUrl = f"https://api.powerbi.com/v1.0/myorg/groups/{group_id}/datasets/{dataset_id}/tables/{table_name}/rows"
    headers = {'Authorization': 'Bearer ' + token,
            'Content-Type': 'application/json'}
    # Post the rows
    response = requests.post(powerBIApiUrl, headers=headers, json=rows_request)
    return response

In [165]:
client_id = ""
client_secret = ""
tenant_id = ""

url2 = f"https://login.microsoftonline.com/{tenant_id}/"

app = msal.ConfidentialClientApplication(client_id=client_id, client_credential=client_secret, authority = url2)
scope = ["https://analysis.windows.net/powerbi/api/.default"]

# Acquire token using client credentials (i.e., service principal)
result = app.acquire_token_for_client(scopes=scope)
token = result['access_token']

In [None]:
df = pd.read_csv("modified_lighter_311.csv")
df.head()

In [None]:
# Update zipcode column to be a string and pad with leading zeros
df['location_zipcode'] = df['location_zipcode'].astype(str).str.zfill(5)
#df['latitude'] = df['latitude'].astype(str)
#df['longitude'] = df['longitude'].astype(str)

In [None]:
# drop submitted_photo and closed_photo columns
df = df.drop(columns=['submitted_photo', 'closed_photo'])

In [None]:
def closure_reason(reason):
    # Right strip the reason
    reason = reason.strip()
    if 'resolved' in reason.lower():
        return 'resolved'
    elif 'noted' in reason.lower():
        return 'noted'
    elif 'automation' in reason.lower():
        return 'bulk item automation'
    elif 'duplicate' in reason.lower():
        return 'duplicate'
    elif 'invalid' in reason.lower() or "invaild" in reason.lower():
        return 'invalid'
    elif 'viocor' in reason.lower():
        return 'viocor'
    elif 'vioiss' in reason.lower():
        return  'vioiss'
    elif 'adclsd' in reason.lower():
        return 'adclsd'
    elif 'permis' in reason.lower():
        return 'permis'
    elif 'corr' in reason.lower():
        return 'corr'
    elif 'roa' in reason.lower() or "referred to external agency" in reason.lower() or "per " in reason.lower():
        return 'roa'
    elif "" == reason.split(":")[-1] or len(reason.split(":")[-1]) <=6:
        return 'empty'
    elif 'internal case' in reason.lower():
        return 'internal case'
    elif 'novio' in reason.lower():
        return 'novio'
    elif 'nobase' in reason.lower():
        return 'nobase'
    elif "noacc" in reason.lower() or "private" in reason.lower():
        return 'noacc'
    elif "CLOSED" in reason or "AVRS" in reason or "close" in reason.lower().split(":")[-1]:
        return 'closed'
    elif 'violfnd' in reason.lower():
        return 'violfnd'
    elif 'reinsp' in reason.lower():
        return 'reinsp'
    elif 'tfa' in reason.lower():
        return 'tfa'
    elif "e-form" in reason.lower() or "eform" in reason.lower() or "e form" in reason.lower():
        return 'eform'    
    elif "resubmitted" in reason.lower():
        return 'resubmitted'
    elif "cancel" in reason.lower():
        return 'cancel'
    elif "ticket" in reason.lower():
        return 'ticket'
    elif "tsop" in reason.lower():
        return 'tsop'
    else:
        return 'other'

# update closure_reason column
df['closure_reason'] = df['closure_reason'].apply(closure_reason)

In [None]:
# Convert open_dt, closed_dt, and sla_target_dt to datetime
df['open_dt'] = pd.to_datetime(df['open_dt'])
df['closed_dt'] = pd.to_datetime(df['closed_dt'])
df['sla_target_dt'] = pd.to_datetime(df['sla_target_dt'])

# convert resolution time to timedelta
df['resolution_time'] = df['closed_dt'] - df['open_dt']
df.loc[df['resolution_time'] < timedelta(0),'resolution_time'] = timedelta(seconds=0) # replace negative resolution times with 0
df['resolution_time'] = df['resolution_time'].dt.total_seconds() / 86400

# Convert open_dt to open_year, open_month, open_day, open_hour, open_minute, open_dayofweek, open_dayofyear, open_weekofyear, open_quarter
df['open_year'] = df['open_dt'].dt.year
df['open_month'] = df['open_dt'].dt.month
df['open_day'] = df['open_dt'].dt.day
df['open_hour'] = df['open_dt'].dt.hour
df['open_minute'] = df['open_dt'].dt.minute
df['open_dayofweek'] = df['open_dt'].dt.dayofweek
df['open_dayofyear'] = df['open_dt'].dt.dayofyear
df['open_quarter'] = df['open_dt'].dt.quarter

In [None]:
# Turn DF into a list of dictionaries
pbi_package = df.to_dict('records')

# Get column types
df_types = df.dtypes.to_dict()

In [None]:
for key in df_types:
    print(key, df_types[key])

In [133]:
pbi_type_map = {
    'object': 'string',
    'int64': 'Int64',
    'int32': 'Int64',
    'float64': 'Double',
    'bool': 'bool',
    'datetime64[ns]': 'Datetime'
}

# Convert the types
pbi_types = {}

for col in df_types.keys():
    pbi_types[col] = pbi_type_map[str(df_types[col])]

pbi_types

{'case_enquiry_id': 'Int64',
 'open_dt': 'Datetime',
 'sla_target_dt': 'Datetime',
 'closed_dt': 'Datetime',
 'on_time': 'string',
 'case_status': 'string',
 'closure_reason': 'string',
 'case_title': 'string',
 'subject': 'string',
 'reason': 'string',
 'type': 'string',
 'queue': 'string',
 'department': 'string',
 'fire_district': 'string',
 'pwd_district': 'string',
 'city_council_district': 'string',
 'police_district': 'string',
 'neighborhood': 'string',
 'neighborhood_services_district': 'string',
 'ward': 'string',
 'precinct': 'string',
 'location_zipcode': 'string',
 'latitude': 'Double',
 'longitude': 'Double',
 'source': 'string',
 'resolution_time': 'Double',
 'open_year': 'Int64',
 'open_month': 'Int64',
 'open_day': 'Int64',
 'open_hour': 'Int64',
 'open_minute': 'Int64',
 'open_dayofweek': 'Int64',
 'open_dayofyear': 'Int64',
 'open_quarter': 'Int64'}

In [134]:
dataset_build_request = {}
dataset_name = "311"
table_name = "base"
dataset_build_request['name'] = dataset_name
dataset_build_request['defaultMode'] = 'Push'
dataset_build_request['tables'] = [{'name': table_name, 'columns': []}]
# Populate columns
for col in pbi_types.keys():
    dataset_build_request['tables'][0]['columns'] += [{'name': col, 'dataType': pbi_types[col]}]

In [135]:
group_id = "f36042ed-0cc0-4837-87fc-856b4b4cc901"

# Delete the dataset if it already exists
try:
    dataset_id, dataset_name = powerBIGetDatasetID(group_id, token)
    response = powerBIDeleteDataset(group_id, dataset_id, token)
    print("Dataset Deleted")
except:
    print("Dataset does not exist")

Dataset does not exist


In [136]:
try:
    dataset_id, dataset_name = powerBIGetDatasetID(group_id, token)
    # Put the table
    put_request = dataset_build_request['tables'][0]
    put_table_name = put_request['name']
    powerBIPutTable(group_id, dataset_id, put_table_name, put_request, token)
    print("Table Updated")
except:
    # Post the dataset
    response = powerBIPostDataset(group_id, dataset_build_request, token)
    print("Table Posted")
    dataset_id, dataset_name = powerBIGetDatasetID(group_id, token)


Table Posted


In [137]:
# save df to csv
df.to_csv("modified_dates_311.csv", index=False)

In [138]:
# Apply pbi types to package
for col in pbi_types.keys():
    for row in pbi_package:
        if pbi_types[col] == 'Datetime':
            try:
                row[col] = row[col].strftime('%Y-%m-%dT%H:%M:%SZ')
            except:
                row[col] = None
        elif pbi_types[col] == 'Int64':
            try:
                row[col] = int(row[col])
            except:
                row[col] = 0

In [139]:
rows_request = {}
rows_request['rows'] = pbi_package
rows_request

{'rows': [{'case_enquiry_id': 101000295613,
   'open_dt': '2011-06-30T21:32:33Z',
   'sla_target_dt': '2011-07-14T21:32:32Z',
   'closed_dt': '2011-07-01T01:06:58Z',
   'on_time': 'ONTIME',
   'case_status': 'Closed',
   'closure_reason': 'resolved',
   'case_title': 'Street Light Outages',
   'subject': 'Public Works Department',
   'reason': 'Street Lights',
   'type': 'Street Light Outages',
   'queue': 'PWDx_Street Light Outages',
   'department': 'PWDx',
   'fire_district': '8',
   'pwd_district': '07',
   'city_council_district': '3',
   'police_district': 'B3',
   'neighborhood': 'Greater Mattapan',
   'neighborhood_services_district': '7',
   'ward': 'Ward 17',
   'precinct': '1714',
   'location_zipcode': '02126',
   'latitude': 42.27154416019889,
   'longitude': -71.07722102041515,
   'source': 'Employee Generated',
   'resolution_time': 0.14890046296296297,
   'open_year': 2011,
   'open_month': 6,
   'open_day': 30,
   'open_hour': 21,
   'open_minute': 32,
   'open_dayofwe

In [140]:
import math

target_set = set()
def check_for_invalid_floats(data):
    for item in data:
        for key, value in item.items():
            if isinstance(value, float):
                if math.isnan(value) or value == float('inf') or value == float('-inf'):
                    target_set.add(key)

check_for_invalid_floats(pbi_package)

In [141]:
target_set

{'case_title',
 'city_council_district',
 'fire_district',
 'latitude',
 'longitude',
 'neighborhood_services_district',
 'on_time',
 'police_district',
 'precinct',
 'pwd_district',
 'resolution_time',
 'ward'}

In [142]:
# Go through each row, if a key's value is nan, inf, or -inf, set it to None
for row in pbi_package:
    for key in row.keys():
        if isinstance(row[key], float):
            if math.isnan(row[key]) or row[key] == float('inf') or row[key] == float('-inf'):
                row[key] = None

In [143]:
rows_request = {}
rows_request['rows'] = pbi_package
rows_request

{'rows': [{'case_enquiry_id': 101000295613,
   'open_dt': '2011-06-30T21:32:33Z',
   'sla_target_dt': '2011-07-14T21:32:32Z',
   'closed_dt': '2011-07-01T01:06:58Z',
   'on_time': 'ONTIME',
   'case_status': 'Closed',
   'closure_reason': 'resolved',
   'case_title': 'Street Light Outages',
   'subject': 'Public Works Department',
   'reason': 'Street Lights',
   'type': 'Street Light Outages',
   'queue': 'PWDx_Street Light Outages',
   'department': 'PWDx',
   'fire_district': '8',
   'pwd_district': '07',
   'city_council_district': '3',
   'police_district': 'B3',
   'neighborhood': 'Greater Mattapan',
   'neighborhood_services_district': '7',
   'ward': 'Ward 17',
   'precinct': '1714',
   'location_zipcode': '02126',
   'latitude': 42.27154416019889,
   'longitude': -71.07722102041515,
   'source': 'Employee Generated',
   'resolution_time': 0.14890046296296297,
   'open_year': 2011,
   'open_month': 6,
   'open_day': 30,
   'open_hour': 21,
   'open_minute': 32,
   'open_dayofwe

In [144]:
def chunk_data(data, size):
    """Split data into chunks."""
    return [data[i:i+size] for i in range(0, len(data), size)]

# Define the chunk size
CHUNK_SIZE = 10000  # This can be adjusted based on your needs

# Split the pbi_package into chunks
chunks = chunk_data(pbi_package, CHUNK_SIZE)

In [145]:
successful_chunks = []
failed_chunks = list(range(len(chunks)))

In [166]:
# post 100 chunks one by one, 
# Then cool down for 1 hour 
# Then post the next 100 chunks one by one
import time 
counter = 0 
print(f"Chunks remaining: {len(failed_chunks)}")
print(f"Chunks completed: {len(successful_chunks)}")
while failed_chunks:
    current_chunk_idx = failed_chunks.pop(0)
    current_chunk = chunks[current_chunk_idx]
    rows_request = {'rows': chunks[current_chunk_idx]}
    try:
        response = powerBIPostRows(group_id, dataset_id, table_name, rows_request, token)
        if response.status_code == 200:
            successful_chunks.append(current_chunk_idx)
            counter += 1
        else:
            raise Exception(f"Failed to send chunk {current_chunk_idx + 1} due to: {response.text}")
    except Exception as e:
        print(e)
        print(f"length of failed_chunks: {len(failed_chunks)}")
        failed_chunks.append(current_chunk_idx)
        print(f"length of failed_chunks: {len(failed_chunks)}")
        try:
            if "Request is blocked by the upstream service until:" in e.args[0]:
                date_string = e.args[0].split(" until: ")[1]
                date_string = date_string.rstrip("}")
                date_string = date_string.replace('"', '')  # remove inner quotation marks
                print("Rate limited until " + date_string)
                # Sleep until date in error message
                sleep_until = parser.parse(date_string)
                sleep_duration = sleep_until - datetime.now()
                print(f"Sleeping for {sleep_duration.total_seconds()//3600} hours")
                time.sleep(sleep_duration.total_seconds())
            elif "Access token has expired" in e.args[0]:
                app.acquire_token_for_client(scopes=scope)
                token = result['access_token']
        except Exception as inner_exception:
            print(f"Error while processing exception: {inner_exception}")
            continue

    if counter % 100 == 0:
        print(f"Sleeping for 1 hour at {datetime.now()}")
        print(f"Chunks remaining: {len(failed_chunks)}")
        print(f"Chunks completed: {len(successful_chunks)}")
        break

Chunks remaining: 7
Chunks completed: 264


In [161]:
# update failed chunks, by removing the successful chunks
failed_chunks = list(set(failed_chunks) - set(successful_chunks))
print(f"Chunks remaining: {len(failed_chunks)}")
print(f"Chunks completed: {len(successful_chunks)}")
# print total
print(f"Total chunks: {len(chunks)}")

Chunks remaining: 107
Chunks completed: 164
Total chunks: 271


In [148]:
failed_chunks.append(current_chunk_idx)