In [2]:

import polars as pl
import logging
import json

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, StringType, MapType

# Initialize Spark session
spark = SparkSession.builder.appName("ParseDataWithLogging").getOrCreate()

# Set up logging configuration
logging.basicConfig(level=logging.WARN)
logger = logging.getLogger("ContactDataParser")

24/11/07 15:32:08 WARN Utils: Your hostname, codespaces-7cf8c9 resolves to a loopback address: 127.0.0.1; using 10.0.10.61 instead (on interface eth0)
24/11/07 15:32:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/07 15:32:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [19]:
# Load CSV data
csv_path = "resources/orders.csv"
# Read a CSV file
df = spark.read.csv(csv_path, header=True, inferSchema=True, sep=";")
# Show the DataFrame
#df.show()
# Read a CSV file with ";" as the separator
df = pl.read_csv(csv_path, separator=";")
# Force the data types for the columns
df_stg = df.with_columns([
    pl.col("order_id").cast(pl.Utf8),      # Force 'column1' to be String (Utf8)
    pl.col("date").cast(pl.Utf8),     # Force 'column2' to be Integer (Int64)
    pl.col("company_id").cast(pl.Utf8),   # Force 'column3' to be Float (Float64)
    pl.col("company_name").cast(pl.Utf8),   # Force 'column3' to be Float (Float64)
    pl.col("crate_type").cast(pl.Utf8),   # Force 'column3' to be Float (Float64)
    pl.col("contact_data").cast(pl.Utf8),   # Force 'column3' to be Float (Float64)
    pl.col("salesowners").cast(pl.Utf8)  # Force 'column3' to be Float (Float64)
])

orders_stable_schema = df_stg.fill_null("N/A")


In [55]:
type(df_stg['contact_data'][4])

NoneType

In [3]:
# Load JSON data
invoicing_data_df = pl.read_json("resources/invoicing_data.json")

In [13]:
import json

# Specify the path to the JSON file
file_path = "resources/invoicing_data.json"

# Open and load the JSON file
with open(file_path, 'r') as file:
    data = json.load(file)

# Display the contents
print(data.keys())
print(data['data'].keys())
print(data['data']['invoices'])
print(len(data['data']['invoices']))

json_content = data['data']['invoices']

dict_keys(['data'])
dict_keys(['invoices'])
[{'id': 'e1e1e1e1-e1e1-e1e1-e1e1-e1e1e1e1e1e1', 'orderId': 'f47ac10b-58cc-4372-a567-0e02b2c3d479', 'companyId': '1e2b47e6-499e-41c6-91d3-09d12dddfbbd', 'grossValue': '324222', 'vat': '0'}, {'id': 'e2e2e2e2-e2e2-e2e2-e2e2-e2e2e2e2e2e2', 'orderId': 'f47ac10b-58cc-4372-a567-0e02b2c3d480', 'companyId': '0f05a8f1-2bdf-4be7-8c82-4c9b58f04898', 'grossValue': '193498', 'vat': '19'}, {'id': 'e3e3e3e3-e3e3-e3e3-e3e3-e3e3e3e3e3e3', 'orderId': 'f47ac10b-58cc-4372-a567-0e02b2c3d481', 'companyId': '1e2b47e6-499e-41c6-91d3-09d12dddfbbd', 'grossValue': '345498', 'vat': '21'}, {'id': 'e4e4e4e4-e4e4-e4e4-e4e4-e4e4e4e4e4e4', 'orderId': 'f47ac10b-58cc-4372-a567-0e02b2c3d482', 'companyId': '1c4b0b50-1d5d-463a-b56e-1a6fd3aeb7d6', 'grossValue': '245412', 'vat': '34'}, {'id': 'e5e5e5e5-e5e5-e5e5-e5e5-e5e5e5e5e5e5', 'orderId': 'f47ac10b-58cc-4372-a567-0e02b2c3d483', 'companyId': '34538e39-cd2e-4641-8d24-3c94146e6f16', 'grossValue': '145467', 'vat': '0'}, {'id': 'e6e6

In [14]:
# Extract keys from the first dictionary as a reference
reference_keys = set(json_content[0].keys())

# Check each dictionary's keys against the reference keys
all_have_same_keys = all(set(d.keys()) == reference_keys for d in json_content)

if all_have_same_keys:
    print("All dictionaries have the same keys.")
else:
    print("Not all dictionaries have the same keys.")

All dictionaries have the same keys.


In [None]:
df = pl.DataFrame(json_content)
df_unique = df.unique()
print(len(df))
print(len(df_unique))


In [41]:
df_unique = df.unique()


In [28]:
duplicate_count = df.select(pl.all().is_duplicated().sum())

In [29]:
duplicate_count

id,orderId,companyId,grossValue,vat
u32,u32,u32,u32,u32
0,2,4,2,12


In [None]:
duplicates_by_name_age = df.filter(pl.struct(["companyId"]).is_duplicated())


In [35]:
duplicates_by_name_age = df.filter(pl.struct(["orderId","companyId","grossValue","vat"]).is_duplicated())


In [36]:
duplicates_by_name_age

id,orderId,companyId,grossValue,vat
str,str,str,str,str
"""e9e9e9e9-e9e9-e9e9-e9e9-e9e9e9…","""f47ac10b-58cc-4372-a567-0e02b2…","""5f0bdbdf-1d84-4c23-957c-8bb8c0…","""345310""","""34"""
"""ea9ea9ea-9ea9-9ea9-9ea9-9ea9ea…","""f47ac10b-58cc-4372-a567-0e02b2…","""5f0bdbdf-1d84-4c23-957c-8bb8c0…","""345310""","""34"""


In [18]:
# Filter to show only duplicated rows
duplicates = df.filter(pl.all().is_duplicated())

ComputeError: The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: 

	col("id").is_duplicated(),
	col("orderId").is_duplicated(),
	col("companyId").is_duplicated(),
	col("grossValue").is_duplicated(),
	col("vat").is_duplicated(),
This is ambiguous. Try to combine the predicates with the 'all' or `any' expression.

Resolved plan until failure:

	---> FAILED HERE RESOLVING 'filter' <---
DF ["id", "orderId", "companyId", "grossValue"]; PROJECT */5 COLUMNS; SELECTION: None

In [4]:
df_parsed.show()

[Stage 2:>                                                          (0 + 1) / 1]

+--------------------+--------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+
|            order_id|    date|          company_id|        company_name|crate_type|        contact_data|         salesowners|      parsed_contact|
+--------------------+--------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+
|f47ac10b-58cc-437...|29.01.22|1e2b47e6-499e-41c...|     Fresh Fruits Co|   Plastic|"[{ ""contact_nam...|Leonard Cohen, Lu...|{contact_surname ...|
|f47ac10b-58cc-437...|21.02.22|0f05a8f1-2bdf-4be...|         Veggies Inc|      Wood|"[{ ""contact_nam...|Luke Skywalker, D...|{contact_surname ...|
|f47ac10b-58cc-437...|03.04.22|1e2b47e6-499e-41c...|    Fresh Fruits c.o|     Metal|"[{ ""contact_nam...|      Luke Skywalker|{contact_surname ...|
|f47ac10b-58cc-437...|14.07.21|1c4b0b50-1d5d-463...|    Seafood Supplier|   Plastic|                NULL|David G

Error parsing entry: "[{ ""contact_name"":""Curtis"", ""contact_surname"":""Jackson"", ""city"":""Chicago"", ""cp"": ""12345""}]" - Extra data: line 1 column 6 (char 5)
Error parsing entry: "[{ ""contact_name"":""Maria"", ""contact_surname"":""Theresa"", ""city"":""Calcutta""}]" - Extra data: line 1 column 6 (char 5)
Error parsing entry: "[{ ""contact_name"":""Para"", ""contact_surname"":""Cetamol"", ""city"":""Frankfurt am Oder"", ""cp"": 3934}]" - Extra data: line 1 column 6 (char 5)
Error parsing entry: "[{ ""contact_name"":""John"", ""contact_surname"":""Krasinski"", ""city"":""New York"", ""cp"": ""1203""}]" - Extra data: line 1 column 6 (char 5)
Error parsing entry: "[{ ""contact_name"":""Jennifer"", ""contact_surname"":""Lopez"", ""city"":""Esplugues de Llobregat""}]" - Extra data: line 1 column 6 (char 5)
Error parsing entry: "{ ""contact_name"":""Liav"", ""contact_surname"": ""Ichenbaum"", ""city"":""Tel Aviv""}" - Extra data: line 1 column 5 (char 4)
Error parsing entry: "[{ 

In [7]:
# Define the parsing function with error handling and logging
def parse_contact_data(entry):
    try:
        # Log each entry being processed
        logger.info(f"Processing entry: {entry}")

        # Handle None or zero cases directly
        if entry in ["N/A", "0", ""]:
            logger.info(f"Entry is None, 0, or empty. Returning default dictionary for entry: {entry}")
            return {"contact_name": None, "contact_surname": None, "city": None, "cp": None}

        # Clean the entry and parse as JSON
        cleaned_entry = entry.replace("'", '"')  # Replace single quotes with double for JSON compatibility
        parsed_entry = json.loads(cleaned_entry)  # Parse the JSON string
        
        # Check if parsed entry is a list, and extract the first dictionary if so
        if isinstance(parsed_entry, list) and parsed_entry:
            parsed_entry = parsed_entry[0]  # Extract the first dictionary in the list

        # Confirm parsed_entry is a dictionary
        if not isinstance(parsed_entry, dict):
            logger.warning(f"Parsed entry is not a dictionary. Returning default for entry: {entry}")
            return {"contact_name": None, "contact_surname": None, "city": None, "cp": None}

        # Log successfully parsed dictionary
        logger.info(f"Successfully parsed dictionary: {parsed_entry}")

        # Return parsed data with missing keys handled as None
        return {
            "contact_name": parsed_entry.get("contact_name"),
            "contact_surname": parsed_entry.get("contact_surname"),
            "city": parsed_entry.get("city"),
            "cp": parsed_entry.get("cp")
        }
    except Exception as e:
        logger.error(f"Error parsing entry: {entry}. Error: {e}")
        # Return a dictionary with None values on parsing error
        return {"contact_name": None, "contact_surname": None, "city": None, "cp": None}

# Register the UDF with a MapType schema to return a dictionary
parse_udf = udf(parse_contact_data, MapType(StringType(), StringType()))

# Apply the UDF to the 'contact_data' column
df_parsed = df.withColumn("parsed_contact", parse_udf(col("contact_data")))

# Expand the dictionary into separate columns
df_result = df_parsed.select(
    col("parsed_contact")["contact_name"].alias("contact_name"),
    col("parsed_contact")["contact_surname"].alias("contact_surname"),
    col("parsed_contact")["city"].alias("city"),
    col("parsed_contact")["cp"].alias("cp")
)

# Show the result
df_result.show(truncate=False)

AttributeError: 'DataFrame' object has no attribute 'withColumn'

In [7]:
import polars as pl
import pandas as pd
from pyspark.sql import SparkSession
import json

# Convert PySpark DataFrame to Pandas DataFrame
df_pandas = df.toPandas()

In [4]:
# Define the parsing function with error handling and logging
def parse_contact_data(entry):
    try:
        # Log each entry being processed
        logger.info(f"Processing entry: {entry}")

        # Handle None or zero cases directly
        if entry in ["N/A", "0", ""]:
            logger.info(f"Entry is None, 0, or empty. Returning default dictionary for entry: {entry}")
            return {"contact_data": entry, "contact_name": None, "contact_surname": None, "city": None, "cp": None}

        # Clean the entry and parse as JSON
        cleaned_entry = entry.replace("'", '"')  # Replace single quotes with double for JSON compatibility
        parsed_entry = json.loads(cleaned_entry)  # Parse the JSON string
        
        # Check if parsed entry is a list, and extract the first dictionary if so
        if isinstance(parsed_entry, list) and parsed_entry:
            parsed_entry = parsed_entry[0]  # Extract the first dictionary in the list

        # Confirm parsed_entry is a dictionary
        if not isinstance(parsed_entry, dict):
            logger.warning(f"Parsed entry is not a dictionary. Returning default for entry: {entry}")
            return {"contact_data": entry, "contact_name": None, "contact_surname": None, "city": None, "cp": None}
        
        # Confirm parsed_entry is a dictionary
        if parsed_entry is None:
            logger.warning(f"Parsed entry is nond. Returning default for entry: {entry}")
            return {"contact_data": entry,"contact_name": None, "contact_surname": None, "city": None, "cp": None}

        # Log successfully parsed dictionary
        logger.info(f"Successfully parsed dictionary: {parsed_entry}")

        # Return parsed data with missing keys handled as None
        return {
            "contact_data": entry,
            "contact_name": parsed_entry.get("contact_name"),
            "contact_surname": parsed_entry.get("contact_surname"),
            "city": parsed_entry.get("city"),
            "cp": parsed_entry.get("cp")
        }
    except Exception as e:
        logger.error(f"Error parsing entry: {entry}. Error: {e}")
        # Return a dictionary with None values on parsing error
        return {"contact_data": entry,"contact_name": None, "contact_surname": None, "city": None, "cp": None}

In [5]:
import polars as pl
import pandas as pd
from pyspark.sql import SparkSession
import json


df_test = df_stg.with_columns((pl.col("contact_data").map_elements(lambda x: parse_contact_data(x), return_dtype=pl.Object).alias('new_col')))



ERROR:ContactDataParser:Error parsing entry: [{ "contact_name":"Liav", "contact_surname": "Ichenbaum", "city":"Tel Aviv"}. Error: Expecting ',' delimiter: line 1 column 77 (char 76)


In [6]:
df_normalized = pl.json_normalize(df_test['new_col'])

In [31]:
df_normalized_unique = df_normalized.unique()


In [32]:
len(df_normalized_unique)

22

In [13]:
df_test['salesowners'][5]

'Leonard Cohen, David Henderson'

In [14]:
df_test.head()

order_id,date,company_id,company_name,crate_type,contact_data,salesowners,new_col
str,str,str,str,str,str,str,object
"""f47ac10b-58cc-4372-a567-0e02b2…","""29.01.22""","""1e2b47e6-499e-41c6-91d3-09d12d…","""Fresh Fruits Co""","""Plastic""","""[{ ""contact_name"":""Curtis"", ""c…","""Leonard Cohen, Luke Skywalker,…","{'contact_data': '[{ ""contact_name"":""Curtis"", ""contact_surname"":""Jackson"", ""city"":""Chicago"", ""cp"": ""12345""}]', 'contact_name': 'Curtis', 'contact_surname': 'Jackson', 'city': 'Chicago', 'cp': '12345'}"
"""f47ac10b-58cc-4372-a567-0e02b2…","""21.02.22""","""0f05a8f1-2bdf-4be7-8c82-4c9b58…","""Veggies Inc""","""Wood""","""[{ ""contact_name"":""Maria"", ""co…","""Luke Skywalker, David Goliat, …","{'contact_data': '[{ ""contact_name"":""Maria"", ""contact_surname"":""Theresa"", ""city"":""Calcutta""}]', 'contact_name': 'Maria', 'contact_surname': 'Theresa', 'city': 'Calcutta', 'cp': None}"
"""f47ac10b-58cc-4372-a567-0e02b2…","""03.04.22""","""1e2b47e6-499e-41c6-91d3-09d12d…","""Fresh Fruits c.o""","""Metal""","""[{ ""contact_name"":""Para"", ""con…","""Luke Skywalker""","{'contact_data': '[{ ""contact_name"":""Para"", ""contact_surname"":""Cetamol"", ""city"":""Frankfurt am Oder"", ""cp"": 3934}]', 'contact_name': 'Para', 'contact_surname': 'Cetamol', 'city': 'Frankfurt am Oder', 'cp': 3934}"
"""f47ac10b-58cc-4372-a567-0e02b2…","""14.07.21""","""1c4b0b50-1d5d-463a-b56e-1a6fd3…","""Seafood Supplier""","""Plastic""","""N/A""","""David Goliat, Leonard Cohen""","{'contact_data': 'N/A', 'contact_name': None, 'contact_surname': None, 'city': None, 'cp': None}"
"""f47ac10b-58cc-4372-a567-0e02b2…","""23.10.22""","""34538e39-cd2e-4641-8d24-3c9414…","""Meat Packers Ltd""","""Plastic""","""N/A""","""Chris Pratt, David Henderson, …","{'contact_data': 'N/A', 'contact_name': None, 'contact_surname': None, 'city': None, 'cp': None}"


In [15]:
df_duplicates = (
    df_test.groupby("order_id").agg(pl.col("salesowners").n_unique().alias("unique_name_count")).filter(pl.col("unique_name_count") > 1)
)


AttributeError: 'DataFrame' object has no attribute 'groupby'

In [16]:
# Group by company_id and count unique company_name entries
df_duplicates = (
    df_test.group_by("order_id")  # use groupby() not groupby
    .agg(pl.col("salesowners").n_unique().alias("unique_name_count"))
    .filter(pl.col("unique_name_count") > 1)
)

In [None]:
orders_id_per_salesowners = orders_stable_schema.select(["order_id", "salesowners"])


order_id,unique_name_count
str,u32


In [33]:
df_combined = df_test.join(df_normalized_unique, how="inner", on = "contact_data")

In [34]:
len(df_combined)

62

In [35]:
df_combined

order_id,date,company_id,company_name,crate_type,contact_data,salesowners,new_col,contact_name,contact_surname,city,cp
str,str,str,str,str,str,str,object,str,str,str,str
"""f47ac10b-58cc-4372-a567-0e02b2…","""29.01.22""","""1e2b47e6-499e-41c6-91d3-09d12d…","""Fresh Fruits Co""","""Plastic""","""[{ ""contact_name"":""Curtis"", ""c…","""Leonard Cohen, Luke Skywalker,…","{'contact_data': '[{ ""contact_name"":""Curtis"", ""contact_surname"":""Jackson"", ""city"":""Chicago"", ""cp"": ""12345""}]', 'contact_name': 'Curtis', 'contact_surname': 'Jackson', 'city': 'Chicago', 'cp': '12345'}","""Curtis""","""Jackson""","""Chicago""","""12345"""
"""f47ac10b-58cc-4372-a567-0e02b2…","""21.02.22""","""0f05a8f1-2bdf-4be7-8c82-4c9b58…","""Veggies Inc""","""Wood""","""[{ ""contact_name"":""Maria"", ""co…","""Luke Skywalker, David Goliat, …","{'contact_data': '[{ ""contact_name"":""Maria"", ""contact_surname"":""Theresa"", ""city"":""Calcutta""}]', 'contact_name': 'Maria', 'contact_surname': 'Theresa', 'city': 'Calcutta', 'cp': None}","""Maria""","""Theresa""","""Calcutta""",
"""f47ac10b-58cc-4372-a567-0e02b2…","""03.04.22""","""1e2b47e6-499e-41c6-91d3-09d12d…","""Fresh Fruits c.o""","""Metal""","""[{ ""contact_name"":""Para"", ""con…","""Luke Skywalker""","{'contact_data': '[{ ""contact_name"":""Para"", ""contact_surname"":""Cetamol"", ""city"":""Frankfurt am Oder"", ""cp"": 3934}]', 'contact_name': 'Para', 'contact_surname': 'Cetamol', 'city': 'Frankfurt am Oder', 'cp': 3934}","""Para""","""Cetamol""","""Frankfurt am Oder""","""3934"""
"""f47ac10b-58cc-4372-a567-0e02b2…","""14.07.21""","""1c4b0b50-1d5d-463a-b56e-1a6fd3…","""Seafood Supplier""","""Plastic""","""N/A""","""David Goliat, Leonard Cohen""","{'contact_data': 'N/A', 'contact_name': None, 'contact_surname': None, 'city': None, 'cp': None}",,,,
"""f47ac10b-58cc-4372-a567-0e02b2…","""23.10.22""","""34538e39-cd2e-4641-8d24-3c9414…","""Meat Packers Ltd""","""Plastic""","""N/A""","""Chris Pratt, David Henderson, …","{'contact_data': 'N/A', 'contact_name': None, 'contact_surname': None, 'city': None, 'cp': None}",,,,
…,…,…,…,…,…,…,…,…,…,…,…
"""f47ac10b-58cc-4372-a567-0e02b2…","""11.05.25""","""5f0bdbdf-1d84-4c23-957c-8bb8c0…","""Tropical Fruits Ltd""","""Metal""","""[{ ""contact_name"":""Liav"", ""con…","""Yuri Gagarin, David Goliat""","{'contact_data': '[{ ""contact_name"":""Liav"", ""contact_surname"": ""Ichenbaum"", ""city"":""Tel Aviv""}', 'contact_name': None, 'contact_surname': None, 'city': None, 'cp': None}",,,,
"""f47ac10b-58cc-4372-a567-0e02b2…","""20.05.25""","""acdb6f30-764f-404e-8b8e-7e7e3e…","""Organic Farms""","""Plastic""","""[{ ""contact_name"":""Jennifer"", …","""Leon Leonov, Chris Pratt""","{'contact_data': '[{ ""contact_name"":""Jennifer"", ""contact_surname"":""Lopez"", ""city"":""Esplugues de Llobregat""}]', 'contact_name': 'Jennifer', 'contact_surname': 'Lopez', 'city': 'Esplugues de Llobregat', 'cp': None}","""Jennifer""","""Lopez""","""Esplugues de Llobregat""",
"""f47ac10b-58cc-4372-a567-0e02b2…","""27.05.25""","""2e90f2b1-d237-47a6-96e8-6d01c0…","""Seafood Supplier GmbH""","""Metal""","""N/A""","""Ammy Winehouse, Leonard Cohen""","{'contact_data': 'N/A', 'contact_name': None, 'contact_surname': None, 'city': None, 'cp': None}",,,,
"""f47ac10b-58cc-4372-a567-0e02b2…","""04.06.25""","""fa14c3ed-3c48-49f4-bd69-4d7f5b…","""Green Veg Co""","""Plastic""","""N/A""","""Luke Skywalker, Chris Pratt""","{'contact_data': 'N/A', 'contact_name': None, 'contact_surname': None, 'city': None, 'cp': None}",,,,


In [29]:
df_combined = df_test.join(df_normalized, how="inner", on = "contact_data")


In [30]:
df_combined

order_id,date,company_id,company_name,crate_type,contact_data,salesowners,new_col,contact_name,contact_surname,city,cp
str,str,str,str,str,str,str,object,str,str,str,str
"""f47ac10b-58cc-4372-a567-0e02b2…","""29.01.22""","""1e2b47e6-499e-41c6-91d3-09d12d…","""Fresh Fruits Co""","""Plastic""","""[{ ""contact_name"":""Curtis"", ""c…","""Leonard Cohen, Luke Skywalker,…","{'contact_data': '[{ ""contact_name"":""Curtis"", ""contact_surname"":""Jackson"", ""city"":""Chicago"", ""cp"": ""12345""}]', 'contact_name': 'Curtis', 'contact_surname': 'Jackson', 'city': 'Chicago', 'cp': '12345'}","""Curtis""","""Jackson""","""Chicago""","""12345"""
"""f47ac10b-58cc-4372-a567-0e02b2…","""07.11.22""","""27c59f76-5d26-4b82-a89b-59f8df…","""Healthy Snacks""","""Plastic""","""[{ ""contact_name"":""Curtis"", ""c…","""Ammy Winehouse, Marie Curie, C…","{'contact_data': '[{ ""contact_name"":""Curtis"", ""contact_surname"":""Jackson"", ""city"":""Chicago"", ""cp"": ""12345""}]', 'contact_name': 'Curtis', 'contact_surname': 'Jackson', 'city': 'Chicago', 'cp': '12345'}","""Curtis""","""Jackson""","""Chicago""","""12345"""
"""f47ac10b-58cc-4372-a567-0e02b2…","""27.03.24""","""9851fa89-7b7a-4ed5-89f7-65c205…","""Fruit Market Ltd""","""Plastic""","""[{ ""contact_name"":""Curtis"", ""c…","""Leonard Cohen, David Henderson…","{'contact_data': '[{ ""contact_name"":""Curtis"", ""contact_surname"":""Jackson"", ""city"":""Chicago"", ""cp"": ""12345""}]', 'contact_name': 'Curtis', 'contact_surname': 'Jackson', 'city': 'Chicago', 'cp': '12345'}","""Curtis""","""Jackson""","""Chicago""","""12345"""
"""f47ac10b-58cc-4372-a567-0e02b2…","""17.11.24""","""1e2b47e6-499e-41c6-91d3-09d12d…","""Fresh Fruits Co""","""Plastic""","""[{ ""contact_name"":""Curtis"", ""c…","""Leonard Cohen, Luke Skywalker,…","{'contact_data': '[{ ""contact_name"":""Curtis"", ""contact_surname"":""Jackson"", ""city"":""Chicago"", ""cp"": ""12345""}]', 'contact_name': 'Curtis', 'contact_surname': 'Jackson', 'city': 'Chicago', 'cp': '12345'}","""Curtis""","""Jackson""","""Chicago""","""12345"""
"""f47ac10b-58cc-4372-a567-0e02b2…","""21.03.25""","""27c59f76-5d26-4b82-a89b-59f8df…","""Healthy Snacks""","""Plastic""","""[{ ""contact_name"":""Curtis"", ""c…","""Ammy Winehouse, Marie Curie, C…","{'contact_data': '[{ ""contact_name"":""Curtis"", ""contact_surname"":""Jackson"", ""city"":""Chicago"", ""cp"": ""12345""}]', 'contact_name': 'Curtis', 'contact_surname': 'Jackson', 'city': 'Chicago', 'cp': '12345'}","""Curtis""","""Jackson""","""Chicago""","""12345"""
…,…,…,…,…,…,…,…,…,…,…,…
"""f47ac10b-58cc-4372-a567-0e02b2…","""07.11.22""","""27c59f76-5d26-4b82-a89b-59f8df…","""Healthy Snacks""","""Plastic""","""[{ ""contact_name"":""Curtis"", ""c…","""Ammy Winehouse, Marie Curie, C…","{'contact_data': '[{ ""contact_name"":""Curtis"", ""contact_surname"":""Jackson"", ""city"":""Chicago"", ""cp"": ""12345""}]', 'contact_name': 'Curtis', 'contact_surname': 'Jackson', 'city': 'Chicago', 'cp': '12345'}","""Curtis""","""Jackson""","""Chicago""","""12345"""
"""f47ac10b-58cc-4372-a567-0e02b2…","""27.03.24""","""9851fa89-7b7a-4ed5-89f7-65c205…","""Fruit Market Ltd""","""Plastic""","""[{ ""contact_name"":""Curtis"", ""c…","""Leonard Cohen, David Henderson…","{'contact_data': '[{ ""contact_name"":""Curtis"", ""contact_surname"":""Jackson"", ""city"":""Chicago"", ""cp"": ""12345""}]', 'contact_name': 'Curtis', 'contact_surname': 'Jackson', 'city': 'Chicago', 'cp': '12345'}","""Curtis""","""Jackson""","""Chicago""","""12345"""
"""f47ac10b-58cc-4372-a567-0e02b2…","""17.11.24""","""1e2b47e6-499e-41c6-91d3-09d12d…","""Fresh Fruits Co""","""Plastic""","""[{ ""contact_name"":""Curtis"", ""c…","""Leonard Cohen, Luke Skywalker,…","{'contact_data': '[{ ""contact_name"":""Curtis"", ""contact_surname"":""Jackson"", ""city"":""Chicago"", ""cp"": ""12345""}]', 'contact_name': 'Curtis', 'contact_surname': 'Jackson', 'city': 'Chicago', 'cp': '12345'}","""Curtis""","""Jackson""","""Chicago""","""12345"""
"""f47ac10b-58cc-4372-a567-0e02b2…","""21.03.25""","""27c59f76-5d26-4b82-a89b-59f8df…","""Healthy Snacks""","""Plastic""","""[{ ""contact_name"":""Curtis"", ""c…","""Ammy Winehouse, Marie Curie, C…","{'contact_data': '[{ ""contact_name"":""Curtis"", ""contact_surname"":""Jackson"", ""city"":""Chicago"", ""cp"": ""12345""}]', 'contact_name': 'Curtis', 'contact_surname': 'Jackson', 'city': 'Chicago', 'cp': '12345'}","""Curtis""","""Jackson""","""Chicago""","""12345"""


In [97]:
# Step 1: Normalize the 'contact_data' column (which is a dictionary column)
df_normalized = pl.json_normalize(df_test['new_col'])

# Step 2: Join the normalized columns back with the original DataFrame
df_combined = df_test.join(df_normalized, how="right_on")

ValueError: must specify `on` OR `left_on` and `right_on`

In [None]:
df_test.select(pl.col('new_col')).unnest('new_col')

AttributeError: 'Expr' object has no attribute 'json_decode'

In [90]:
df_test.head(2)

order_id,date,company_id,company_name,crate_type,contact_data,salesowners,new_col
str,str,str,str,str,str,str,object
"""f47ac10b-58cc-4372-a567-0e02b2…","""29.01.22""","""1e2b47e6-499e-41c6-91d3-09d12d…","""Fresh Fruits Co""","""Plastic""","""[{ ""contact_name"":""Curtis"", ""c…","""Leonard Cohen, Luke Skywalker,…","{'contact_name': 'Curtis', 'contact_surname': 'Jackson', 'city': 'Chicago', 'cp': '12345'}"
"""f47ac10b-58cc-4372-a567-0e02b2…","""21.02.22""","""0f05a8f1-2bdf-4be7-8c82-4c9b58…","""Veggies Inc""","""Wood""","""[{ ""contact_name"":""Maria"", ""co…","""Luke Skywalker, David Goliat, …","{'contact_name': 'Maria', 'contact_surname': 'Theresa', 'city': 'Calcutta', 'cp': None}"


In [93]:
pl.json_normalize(df_test['new_col'])


contact_name,contact_surname,city,cp
str,str,str,str
"""Curtis""","""Jackson""","""Chicago""","""12345"""
"""Maria""","""Theresa""","""Calcutta""",
"""Para""","""Cetamol""","""Frankfurt am Oder""","""3934"""
,,,
,,,
…,…,…,…
,,,
"""Jennifer""","""Lopez""","""Esplugues de Llobregat""",
,,,
,,,


In [77]:

df_stg_extracted = df_test.with_columns([
    pl.col("new_col").arr.get("contact_name").alias("contact_name"),
    pl.col("new_col").arr.get("contact_surname").alias("contact_surname"),
    pl.col("new_col").arr.get("city").alias("city"),
    pl.col("new_col").arr.get("cp").alias("cp")
])

ColumnNotFoundError: contact_name

Resolved plan until failure:

	---> FAILED HERE RESOLVING 'with_columns' <---
DF ["order_id", "date", "company_id", "company_name"]; PROJECT */8 COLUMNS; SELECTION: None

In [None]:
orders_id_per_salesowners = orders_stable_schema.select(["order_id", "salesowners"])



In [23]:
# Split the 'names' column on ', ' and then explode it
df_expanded = (
    orders_id_per_salesowners.with_columns(pl.col("salesowners").str.split(",").alias("names"))  # Split each entry by comma and space
      .explode("names")  # Expand each list into separate rows
)

In [26]:
orders_id_per_salesowners_1 = orders_id_per_salesowners.unique()


In [25]:
orders_id_per_salesowners = orders_stable_schema.select(["order_id", "salesowners"])

# Split the 'names' column on ', ' and then explode it
orders_id_per_salesowners = (
    orders_id_per_salesowners.with_columns(pl.col("salesowners").str.split(",").alias("names"))  # Split each entry by comma and space
      .explode("names")  # Expand each list into separate rows
)

orders_id_per_salesowners = orders_id_per_salesowners.drop("salesowners")

orders_id_per_salesowners.rename({"names": "salesowners"})


order_id,salesowners
str,str
"""f47ac10b-58cc-4372-a567-0e02b2…","""Leonard Cohen"""
"""f47ac10b-58cc-4372-a567-0e02b2…",""" Luke Skywalker"""
"""f47ac10b-58cc-4372-a567-0e02b2…",""" Ammy Winehouse"""
"""f47ac10b-58cc-4372-a567-0e02b2…","""Luke Skywalker"""
"""f47ac10b-58cc-4372-a567-0e02b2…",""" David Goliat"""
…,…
"""f47ac10b-58cc-4372-a567-0e02b2…",""" Leonard Cohen"""
"""f47ac10b-58cc-4372-a567-0e02b2…","""Luke Skywalker"""
"""f47ac10b-58cc-4372-a567-0e02b2…",""" Chris Pratt"""
"""f47ac10b-58cc-4372-a567-0e02b2…","""Leonard Cohen"""
