In [None]:
%pip install openai==1.12.0

In [None]:
from openai import AzureOpenAI
from IPython.display import display, HTML
import os
import textwrap
import json 

OPENAI_GPT4_DEPLOYMENT_NAME="gpt-4"
OPENAI_DEPLOYMENT_ENDPOINT="<your openai deployment endpoint>" 
OPENAI_API_KEY="<your openai api key>"

In [None]:
client = AzureOpenAI(
        azure_endpoint=OPENAI_DEPLOYMENT_ENDPOINT,
        api_key=OPENAI_API_KEY,
        api_version="2023-09-01-preview"
    )

In [15]:
#create a DataFrame and save it as a table in spark and save it as a parquet file and then into a MS Fabric Lakehouse table called people
from pyspark.sql.functions import col, year, month, dayofmonth, avg

df = spark.createDataFrame([(1, "John Doe", "60654"), (2, "Jane Doe", "59001"), (3, "Tom Smith", "32501")], ["id", "name", "zipcode"])
df.show()
df.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable("people")

StatementMeta(, 4e30de6b-80ff-4203-9cdb-dca6b62ced94, 22, Finished, Available, Finished)

+---+---------+-------+
| id|     name|zipcode|
+---+---------+-------+
|  1| John Doe|  60654|
|  2| Jane Doe|  59001|
|  3|Tom Smith|  32501|
+---+---------+-------+



## Using OpenAI to enrich the data

In [8]:
def call_openAI(text):

    system_message = """
You are an AI assistant that helps people find demographic information for zip codes for research purposes
1. Evaluate the given US zip code and provide demographic information for the zip code as follows:
    Age distribution, Family structure, Income distribution, Education level, Employment status, Housing type, Urban/Suburban/Rural classification
2. Do not provide any additional examples to the output, just the JSON format.
3. Provide the output in a valid JSON format that can be serialized as a JSON object, as follows only specifying one value with the highest probability:
    Output:
        {
        "AgeDistribution": "18-24, 25-34, 35-44, 45-54, 55-64, 65+",
        "FamilyStructure": "single, family, retired, other",
        "IncomeDistribution": "$0-$25k, $25k-$50k, $50k-$75k, $75k-$100k, $100k-$150k, $150k+",
        "EducationLevel": "Primary, High School, Postgraduate, no graduation",
        "EmploymentStatus": "Self-employed, Employed, Unemployed, Retired, Student",
        "HousingType": " Single-family, Multi-family, Condo, Townhouse, Apartment, Mobile home",
        "UrbanClassification": "Urban, Suburban, Rural"
        }
    """

    response = client.chat.completions.create(
        model=OPENAI_GPT4_DEPLOYMENT_NAME,
        response_format={ "type": "json_object" },
        messages = [
            {"role":"system","content":system_message},
            {"role":"user","content":text}
            ],
        temperature=0.7,
        max_tokens=800,
        top_p=0.95,
        frequency_penalty=0,
        presence_penalty=0,
        stop=None
    )

    return response.choices[0].message.content

def prettyprint(text: str) -> str:
    print(textwrap.fill(text, 60))


StatementMeta(, 4e30de6b-80ff-4203-9cdb-dca6b62ced94, 15, Finished, Available, Finished)

In [9]:
# checking it works
answer = call_openAI("60654")
display(HTML(answer))

StatementMeta(, 4e30de6b-80ff-4203-9cdb-dca6b62ced94, 16, Finished, Available, Finished)

In [16]:
#let's create an enriching data of zipcodes info
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

data =[( "60654", "25-34", "single", "100k−150k", "Postgraduate", "Employed", "Apartment", "Urban" )]

schema = StructType([ \
    StructField("zipcode",StringType(),True), \
    StructField("AgeDistribution",StringType(),True), \
    StructField("FamilyStructure",StringType(),True), \
    StructField("IncomeDistribution", StringType(), True), \
    StructField("EducationLevel", StringType(), True), \
    StructField("EmploymentStatus", StringType(), True), \
    StructField("HousingType", StringType(), True), \
    StructField("UrbanClassification", StringType(), True) \
  ])

df_zipcodes = spark.createDataFrame(data = data, schema=schema)
df_zipcodes.show()
df_zipcodes.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable("zipcodes")


StatementMeta(, 4e30de6b-80ff-4203-9cdb-dca6b62ced94, 23, Finished, Available, Finished)

+-------+---------------+---------------+------------------+--------------+----------------+-----------+-------------------+
|zipcode|AgeDistribution|FamilyStructure|IncomeDistribution|EducationLevel|EmploymentStatus|HousingType|UrbanClassification|
+-------+---------------+---------------+------------------+--------------+----------------+-----------+-------------------+
|  60654|          25-34|         single|         100k−150k|  Postgraduate|        Employed|  Apartment|              Urban|
+-------+---------------+---------------+------------------+--------------+----------------+-----------+-------------------+



## Now we can enrich the data

In [17]:
import pandas as pd
import json 
import pyspark.pandas as ps

results = spark.sql("select people.zipcode from people where not exists(select * from zipcodes where people.zipcode = zipcodes.zipcode)")
tableList = [x["zipcode"] for x in results.rdd.collect()]

json_list = []

for item in tableList:
    json_result = call_openAI(item)
    json_object = json.loads(json_result)
    json_object["zipcode"] = item
    json_list.append(json_object)
display(json_list)


StatementMeta(, 4e30de6b-80ff-4203-9cdb-dca6b62ced94, 24, Finished, Available, Finished)

[{'AgeDistribution': '45-54',
  'FamilyStructure': 'family',
  'IncomeDistribution': '$50k-$75k',
  'EducationLevel': 'High School',
  'EmploymentStatus': 'Employed',
  'HousingType': 'Single-family',
  'UrbanClassification': 'Rural',
  'zipcode': '59001'},
 {'AgeDistribution': '25-34',
  'FamilyStructure': 'single',
  'IncomeDistribution': '$25k-$50k',
  'EducationLevel': 'High School',
  'EmploymentStatus': 'Employed',
  'HousingType': 'Apartment',
  'UrbanClassification': 'Urban',
  'zipcode': '32501'}]

In [18]:
df = pd.DataFrame(json_list)
spark.createDataFrame(df).write.format("delta").mode("append").saveAsTable("zipcodes")

StatementMeta(, 4e30de6b-80ff-4203-9cdb-dca6b62ced94, 25, Finished, Available, Finished)

In [19]:
df_test = spark.read.table("zipcodes")
display(df_test)

StatementMeta(, 4e30de6b-80ff-4203-9cdb-dca6b62ced94, 26, Finished, Available, Finished)

DataFrame[zipcode: string, AgeDistribution: string, FamilyStructure: string, IncomeDistribution: string, EducationLevel: string, EmploymentStatus: string, HousingType: string, UrbanClassification: string]