In [25]:
import requests
import pandas as pd
import json
from pyspark.sql import SparkSession
from concurrent.futures import ThreadPoolExecutor
import logging

In [16]:
%run ./AzureKeyVault.ipynb

In [27]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

In [15]:
spark = SparkSession.builder \
    .appName("TrafficForecasting") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.default.parallelism", "200") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/05 11:22:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [184]:
class USCensusAPI:
    def __init__(self, dataset, cols, location):
        self.host = 'https://api.census.gov/data'
        self.dataset = dataset
        self.cols = variables
        self.location = location
        self.user_key = KeyVault().get_secret("CensusAPIKey")

    def get_query_url(self):
        return f"{self.host}/{self.dataset}?get={self.cols}&for={self.location}&key={self.user_key}"

    def query_results(self):
        return requests.get(self.get_query_url()).text

    def query_output(self):
        print(self.query_results())

    def query_df(self):
        data = json.loads(self.query_results())
        return pd.DataFrame(
            data[1:],
            columns=data[0]
        )

In [82]:
class AustinDataAPI:
    def __init__(self, filter=None, limit=None, page=None):
        self.url = 'https://data.austintexas.gov/resource/dx9v-zd7x.json'
        self.app_token = KeyVault().get_secret("AustinDataToken")
        self.filter = filter
        self.limit = limit
        self.page = page

    def query_results(self, offset=None):
        params = {"$$app_token": self.app_token}
        
        if self.filter:
            params.update({"$where": self.filter})
        if self.limit:
            params.update({"limit": self.limit})
        if self.page:
            params.update({"page": self.page})
        if offset:
            params.update({"$offset": offset})
            
        response = requests.get(self.url, params=params)
        response.raise_for_status()
        return response.json()

    def query_df(self):
        return pd.DataFrame.from_records(self.query_results().json())

    def get_total_records(self):
        params = { "$select": "count(*)" }        
        response = requests.get(self.url, params=params)
        response.raise_for_status()
        return int(response.json()[0]['count'])

    def get_flattened_data(self, batch_size=1000):
        total_records = self.get_total_records()
        logger.info(f"Total records to process: {total_records}.")
        offsets = range(0, self.get_total_records(), batch_size)
        
        with ThreadPoolExecutor() as executor:
            results = []
            for offset, result in zip(offsets, executor.map(self.query_results, offsets)):
                logger.info(f"Processed batch starting at offset {offset}.")
                results.extend(result)

        logger.info("Processing has been complete.")
        return results

    def get_spark_df(self, batch_size=1000):
        logger.info("Creating spark dataframe for Austin API data.")
        return spark.createDataFrame(self.get_flattened_data())