In [0]:
rawdata_client_id = ''
rawdata_client_secret = ''
rawdata_date = '20240905'  # Example date in YYYYMMDD format
rawdata_format = 'avro'        # Example format (could be 'csv', 'avro'.)

# Directory to save Avro files (if using Databricks, this should be in the /dbfs/ directory)
avro_dir = '/dbfs/tmp/avro_files/'

rawdata_token_url = 'https://api.repo361.com/security/oauth2/token'
rawdata_api_url = 'https://api.repo361.com/rawdata/export'

In [0]:
import requests
import os

In [0]:
import shutil
# Ensure the directory exists
if not os.path.exists(avro_dir):
    os.makedirs(avro_dir)
else:
    shutil.rmtree(avro_dir)
    os.makedirs(avro_dir)

In [0]:
#OAuth 2.0 access token
access_token = ''

headers = {
    'Content-Type': 'application/x-www-form-urlencoded'
}
# Define the form data for the token request
data = {
    'client_id': rawdata_client_id,
    'client_secret': rawdata_client_secret,
    'grant_type': 'client_credentials'
}

# Send POST request to get the access token
response = requests.post(rawdata_token_url, headers=headers, data=data)

if response.status_code == 200:
    # Parse the JSON response and return the access token
    token_data = response.json()
    access_token = token_data['access_token']
else:
    print(f"Failed to get access token. Status Code: {response.status_code}")

In [0]:
headers = {'Authorization': 'Bearer ' + access_token}

# Define the body of the POST request with parameters 'date' and 'format'
payload = {
    'Date': rawdata_date,
    'Format': rawdata_format
}

# Send the POST request with the body payload and headers
response = requests.post(rawdata_api_url, headers=headers, json=payload)

# Check if the request was successful
if response.status_code == 200:
    # Parse the JSON response
    response_data = response.json()

    # Extract 'files' object from the response
    files = response_data.get('files', [])
    print(f"Number of avro files: {len(files)}")

    # Download each file
    for file in files:
        file_name = file.get('name')
        file_url = file.get('url')

        # Ensure the file name has .avro extension
        if not file_name.endswith('.avro'):
            file_name += '.avro'

        # Full path where the Avro file will be saved
        file_path = os.path.join(avro_dir, file_name)

        # Download the file from the URL
        file_response = requests.get(file_url)
        if file_response.status_code == 200:
            with open(file_path, 'wb') as f:
                f.write(file_response.content)
        else:
            print(f"Failed to download {file_name}. Status Code: {file_response.status_code}")    
else:
    print(f"Failed to fetch URLs from API. Status Code: {response.status_code}")

In [0]:
%pip install fastavro


In [0]:
%restart_python

In [0]:
# Initialize an empty list to store records
records = []

In [0]:
import fastavro

# Path to the Avro file in DBFS
avro_file_path = "/dbfs/tmp/avro_files/conversations_detail_without_attributes_001.avro"

# Open the Avro file and extract its schema
with open(avro_file_path, 'rb') as file:
    reader = fastavro.reader(file)
    # Avro Schema
    #schema = reader.schema    
    # Iterate over each record in the Avro file
    for record in reader:                        
        for conv in record['Entities']:
            conversation_data = {
                "ConversationId": conv['ConversationId'],
                "ConversationStart": conv['ConversationStart'],
                "ConversationEnd": conv['ConversationEnd'],
                "Participants": []
            }            
            # Extract participants and sessions details
            for part in conv['Participants']:
                participant_data = {
                    "ParticipantId": part['ParticipantId'],
                    "ParticipantName": part['ParticipantName'],
                    "Purpose": part['Purpose'],
                    "UserId": part['UserId'],
                    "Sessions": []
                }                
                # Extract session details and metrics
                for ss in part['Sessions']:
                    session_data = {
                        "MediaType": ss['MediaType'],
                        "Direction": ss['Direction'],
                        "SessionId": ss['SessionId'],
                        "OutboundContactListId": ss['OutboundContactListId'],
                        "OutboundContactId": ss['OutboundContactId'],
                        "OutboundCampaignId": ss['OutboundCampaignId'],
                        "Ani": ss['Ani'],
                        "Dnis": ss['Dnis'],
                        "Segments": [],
                        "Metrics": []
                    }                    
                    # Extract metrics
                    for mm in ss['Metrics']:
                        metric_data = {
                            "Name": mm['Name'],
                            "Value": mm['Value'],
                            "EmitDate": mm['EmitDate']
                        }
                        session_data["Metrics"].append(metric_data)                    
                    # Extract segments
                    for sgm in ss['Segments']:
                        sgm_data = {
                            "DisconnectType": sgm['DisconnectType'],
                            "SegmentType": sgm['SegmentType'],
                            "SegmentStart": sgm['SegmentStart'],
                            "SegmentEnd": sgm['SegmentEnd'],
                            "QueueId": sgm['QueueId'],
                            "WrapUpCode": sgm['WrapUpCode']
                        }
                        session_data["Segments"].append(sgm_data)                                            
                    participant_data["Sessions"].append(session_data)                
                conversation_data["Participants"].append(participant_data)            
            # Append the conversation data to the list of records
            records.append(conversation_data)

print(f"Number of records: {len(records)}")


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType, TimestampType

# Define the schema for Metrics
metrics_schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Value", LongType(), True),
    StructField("EmitDate", TimestampType(), True)
])

# Define the schema for Segments
segments_schema = StructType([
    StructField("DisconnectType", StringType(), True),
    StructField("SegmentType", StringType(), True),
    StructField("SegmentStart", TimestampType(), True),
    StructField("SegmentEnd", TimestampType(), True),
    StructField("QueueId", StringType(), True),
    StructField("WrapUpCode", StringType(), True)
])

# Define the schema for Sessions
sessions_schema = StructType([
    StructField("MediaType", StringType(), True),
    StructField("Direction", StringType(), True),
    StructField("SessionId", StringType(), True),
    StructField("OutboundContactListId", StringType(), True),
    StructField("OutboundContactId", StringType(), True),
    StructField("OutboundCampaignId", StringType(), True),
    StructField("Ani", StringType(), True),
    StructField("Dnis", StringType(), True),
    StructField("Segments", ArrayType(segments_schema), True),
    StructField("Metrics", ArrayType(metrics_schema), True)
])

# Define the schema for Participants
participants_schema = StructType([
    StructField("ParticipantId", StringType(), True),
    StructField("ParticipantName", StringType(), True),
    StructField("Purpose", StringType(), True),
    StructField("UserId", StringType(), True),
    StructField("Sessions", ArrayType(sessions_schema), True)
])

# Define the final schema for Conversation
conversation_schema = StructType([
    StructField("ConversationId", StringType(), True),
    StructField("ConversationStart", TimestampType(), True),
    StructField("ConversationEnd", TimestampType(), True),
    StructField("Participants", ArrayType(participants_schema), True)
])

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import Row  # Import Row from pyspark.sql

# Initialize a Spark session
spark = SparkSession.builder.appName("AvroToDataFrame").getOrCreate()

# Convert each record (a dictionary) into a Row object
rows = [Row(**record) for record in records]

# Create a DataFrame using the explicitly defined schema
df = spark.createDataFrame(rows, schema=conversation_schema)

# Show the DataFrame
#df.show(truncate=False)

# Print the schema of the DataFrame
#df.printSchema()


In [0]:
from pyspark.sql import functions as F

# Explode Participants array
df_participants = df.withColumn("participant", F.explode("Participants"))

# Explode Sessions array within each Participant
df_sessions = df_participants.withColumn("session", F.explode("participant.Sessions"))

# Explode Metrics array within each Session
df_metrics = df_sessions.withColumn("metric", F.explode("session.Metrics"))

# Select the required fields
df_selected_metrics = df_metrics.select(
    "ConversationId",
    "ConversationStart",
    "ConversationEnd",
    "participant.ParticipantName",
    "metric.Name",
    "metric.Value",
    "metric.EmitDate"
)

# Add the filter to only select rows where the metric is not null
df_filtered_metrics = df_selected_metrics.filter(
    (F.col("metric.Name").isNotNull()) & (F.col("metric.Value").isNotNull())
)

# Show the filtered metrics
df_filtered_metrics.show(truncate=False)

# Print the schema of the filtered DataFrame
#df_filtered_metrics.printSchema()
