In [1]:
import re
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import struct, col, udf, coalesce, lit
from pyspark.sql.types import StringType, StructType, StructField
from pathlib import Path
import pyspark.sql.functions as F
#from pyspark.sql.functions import when

In [21]:
    # .config("spark.sql.warehouse.dir", warehouse_uri) \
    # .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    # .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
# Reinitialize Spark session with updated warehouse configuration
spark = SparkSession.builder \
    .appName("IcebergWrite") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "spark-warehouse") \
    .getOrCreate()
spark

25/01/13 17:14:10 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
df = spark.read.json("input/expedia-lodging-policies-en_us-1-all.jsonl")

                                                                                

In [4]:

df.show(5)

+--------------+--------------------+--------------------+----------------+--------------------+------------+-------------------------+--------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+---------------+--------------------+
|checkInEndTime| checkInInstructions|       checkInPolicy|checkInStartTime|      checkOutPolicy|checkOutTime|childrenAndExtraBedPolicy|       country|      formsOfPayment|     knowBeforeYouGo|minimumAge|       paymentPolicy|           petPolicy|          propertyId|   propertyType| specialInstructions|
+--------------+--------------------+--------------------+----------------+--------------------+------------+-------------------------+--------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+---------------+--------------------+
|          NULL|                  []|[Check-in time st...|         4:00 PM|[Check-out be

In [5]:
def remove_html_tags_array(text_array):
    def remove_html_tags(text):
        return re.sub('<.*?>', '', text)

    return ' '.join([remove_html_tags(text) for text in text_array])

# Register the function as a UDF
remove_html_tags_udf = udf(remove_html_tags_array, StringType())

In [17]:
df.columns

['checkInEndTime',
 'checkInInstructions',
 'checkInPolicy',
 'checkInStartTime',
 'checkOutPolicy',
 'checkOutTime',
 'childrenAndExtraBedPolicy',
 'country',
 'formsOfPayment',
 'knowBeforeYouGo',
 'minimumAge',
 'paymentPolicy',
 'petPolicy',
 'propertyId',
 'propertyType',
 'specialInstructions']

In [18]:
df.select("propertyId").printSchema()

root
 |-- propertyId: struct (nullable = true)
 |    |-- expedia: string (nullable = true)
 |    |-- hcom: string (nullable = true)
 |    |-- vrbo: string (nullable = true)



In [16]:
df.printSchema()

root
 |-- checkInEndTime: string (nullable = true)
 |-- checkInInstructions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- checkInPolicy: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- checkInStartTime: string (nullable = true)
 |-- checkOutPolicy: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- checkOutTime: string (nullable = true)
 |-- childrenAndExtraBedPolicy: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- country: string (nullable = true)
 |-- formsOfPayment: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- knowBeforeYouGo: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- minimumAge: string (nullable = true)
 |-- paymentPolicy: struct (nullable = true)
 |    |-- localCurrency: string (nullable = true)
 |    |-- optionalExtras: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |  

In [7]:
import pyspark.sql.functions as F
from pyspark.sql.functions import when

# Assuming 'new_df' is your DataFrame

# Define a UDF to map country names to country codes
def get_country_code(country_name):
    country_code_mapping = {
        "Aland Islands": "AX",
        "Albania": "AL",
        "Algeria": "DZ",
        "American Samoa": "AS",
        "Andorra": "AD",
        "Angola": "AO",
        "Anguilla": "AI",
        "Antigua and Barbuda": "AG",
        "Argentina": "AR",
        "Armenia": "AM",
        "Aruba": "AW",
        "Australia": "AU",
        "Austria": "AT",
        "Azerbaijan": "AZ",
        "Bahamas": "BS",
        "Bahrain": "BH",
        "Bangladesh": "BD",
        "Barbados": "BB",
        "Belarus": "BY",
        "Belgium": "BE",
        "Belize": "BZ",
        "Benin": "BJ",
        "Bermuda": "BM",
        "Bhutan": "BT",
        "Bolivia": "BO",
        "Bonaire Saint Eustatius and Saba": "BQ",
        "Bosnia and Herzegovina": "BA",
        "Botswana": "BW",
        "Brazil": "BR",
        "British Virgin Islands": "VG",
        "Brunei": "BN",
        "Bulgaria": "BG",
        "Burkina Faso": "BF",
        "Burundi": "BI",
        "Cambodia": "KH",
        "Cameroon": "CM",
        "Canada": "CA",
        "Cape Verde": "CV",
        "Cayman Islands": "KY",
        "Chad": "TD",
        "Chile": "CL",
        "China": "CN",
        "Christmas Island": "CX",
        "Colombia": "CO",
        "Comoros": "KM",
        "Cook Islands": "CK",
        "Costa Rica": "CR",
        "Croatia": "HR",
        "Cuba": "CU",
        "Curacao": "CW",
        "Cyprus": "CY",
        "Czech Republic": "CZ",
        "Democratic Republic of the Congo": "CD",
        "Denmark": "DK",
        "Djibouti": "DJ",
        "Dominica": "DM",
        "Dominican Republic": "DO",
        "Ecuador": "EC",
        "Egypt": "EG",
        "El Salvador": "SV",
        "Equatorial Guinea": "GQ",
        "Eritrea": "ER",
        "Estonia": "EE",
        "Ethiopia": "ET",
        "Faroe Islands": "FO",
        "Fiji": "FJ",
        "Finland": "FI",
        "France": "FR",
        "French Guiana": "GF",
        "French Polynesia": "PF",
        "Gabon": "GA",
        "Gambia": "GM",
        "Georgia": "GE",
        "Germany": "DE",
        "Ghana": "GH",
        "Gibraltar": "GI",
        "Greece": "GR",
        "Greenland": "GL",
        "Grenada": "GD",
        "Guadeloupe": "GP",
        "Guam": "GU",
        "Guatemala": "GT",
        "Guernsey": "GG",
        "Guinea": "GN",
        "Guinea-Bissau": "GW",
        "Guyana": "GY",
        "Haiti": "HT",
        "Honduras": "HN",
        "Hong Kong": "HK",
        "Hungary": "HU",
        "Iceland": "IS",
        "India": "IN",
        "Indonesia": "ID",
        "Iraq": "IQ",
        "Ireland": "IE",
        "Isle of Man": "IM",
        "Israel": "IL",
        "Italy": "IT",
        "Ivory Coast": "CI",
        "Jamaica": "JM",
        "Japan": "JP",
        "Jersey": "JE",
        "Jordan": "JO",
        "Kazakhstan": "KZ",
        "Kenya": "KE",
        "Kiribati": "KI",
        "Kuwait": "KW",
        "Kyrgyzstan": "KG",
        "Laos": "LA",
        "Latvia": "LV",
        "Lebanon": "LB",
        "Lesotho": "LS",
        "Liberia": "LR",
        "Liechtenstein": "LI",
        "Lithuania": "LT",
        "Luxembourg": "LU",
        "Macao": "MO",
        "Macedonia": "MK",
        "Madagascar": "MG",
        "Malawi": "MW",
        "Malaysia": "MY",
        "Maldives": "MV",
        "Mali": "ML",
        "Malta": "MT",
        "Martinique": "MQ",
        "Mauritania": "MR",
        "Mauritius": "MU",
        "Mayotte": "YT",
        "Mexico": "MX",
        "Micronesia": "FM",
        "Moldova": "MD",
        "Monaco": "MC",
        "Mongolia": "MN",
        "Montenegro": "ME",
        "Montserrat": "MS",
        "Morocco": "MA",
        "Mozambique": "MZ",
        "Myanmar": "MM",
        "Namibia": "NA",
        "Nepal": "NP",
        "Netherlands": "NL",
        "New Caledonia": "NC",
        "New Zealand": "NZ",
        "Nicaragua": "NI",
        "Niger": "NE",
        "Nigeria": "NG",
        "Niue": "NU",
        "Norfolk Island": "NF",
        "Northern Mariana Islands": "MP",
        "Norway": "NO",
        "Oman": "OM",
        "Pakistan": "PK",
        "Palau": "PW",
        "Palestinian Territory": "PS",
        "Panama": "PA",
        "Papua New Guinea": "PG",
        "Paraguay": "PY",
        "Peru": "PE",
        "Philippines": "PH",
        "Poland": "PL",
        "Portugal": "PT",
        "Puerto Rico": "PR",
        "Qatar": "QA",
        "Republic of the Congo": "CG",
        "Reunion": "RE",
        "Romania": "RO",
        "Rwanda": "RW",
        "Saint Barthelemy": "BL",
        "Saint Kitts and Nevis": "KN",
        "Saint Lucia": "LC",
        "Saint Martin": "MF",
        "Saint Pierre and Miquelon": "PM",
        "Saint Vincent and the Grenadines": "VC",
        "Samoa": "WS",
        "San Marino": "SM",
        "Sao Tome and Principe": "ST",
        "Saudi Arabia": "SA",
        "Senegal": "SN",
        "Serbia": "RS",
        "Seychelles": "SC",
        "Sierra Leone": "SL",
        "Singapore": "SG",
        "Sint Maarten": "SX",
        "Slovakia": "SK",
        "Slovenia": "SI",
        "Solomon Islands": "SB",
        "South Africa": "ZA",
        "South Korea": "KR",
        "Spain": "ES",
        "Sri Lanka": "LK",
        "Sudan": "SD",
        "Suriname": "SR",
        "Svalbard and Jan Mayen": "SJ",
        "Swaziland": "SZ",
        "Sweden": "SE",
        "Switzerland": "CH",
        "Taiwan": "TW",
        "Tajikistan": "TJ",
        "Tanzania": "TZ",
        "Thailand": "TH",
        "Togo": "TG",
        "Tonga": "TO",
        "Trinidad and Tobago": "TT",
        "Tunisia": "TN",
        "Turkey": "TR",
        "Turkmenistan": "TM",
        "Turks and Caicos Islands": "TC",
        "U.S. Virgin Islands": "VI",
        "Uganda": "UG",
        "United Arab Emirates": "AE",
        "United Kingdom": "UK",
        "United States": "US",
        "United States Minor Outlying Islands": "UM",
        "Uruguay": "UY",
        "Uzbekistan": "UZ",
        "Vanuatu": "VU",
        "Vietnam": "VN",
        "Wallis and Futuna": "WF",
        "Zambia": "ZM",
        "Zimbabwe": "ZW",
        "":""
        # Add more mappings as needed
    }
    return country_code_mapping.get(country_name, "Unknown")

# Register the function as a UDF
get_country_code_udf = F.udf(get_country_code, StringType())

In [8]:
target_schema = StructType([
    StructField("check_in", StringType(), True),
    StructField("check_out", StringType(), True),
    StructField("policy", StructType([
        StructField("pet_policy", StringType(), True),
        StructField("child_policy", StringType(), True)
    ]), True),
    StructField("country_code", StringType(), True)
])

In [19]:
# Create the transformed DataFrame with the policy column as a struct
transformed_df = df.select(
    coalesce(col("propertyId.expedia"), lit("")).alias("expedia_id"),
    when(col("checkInStartTime").isNull(), lit(""))
      .otherwise(col("checkInStartTime"))
      .alias("check_in"),
    when(col("checkOutTime").isNull(), lit(""))
      .otherwise(col("checkOutTime"))
      .alias("check_out"),
    struct(
        remove_html_tags_udf(coalesce(col("petPolicy"), lit([""]))).alias("pet_policy"),
        remove_html_tags_udf(coalesce(col("childrenAndExtraBedPolicy"), lit([""]))).alias("child_policy")
    ).alias("policy"),
    get_country_code_udf(coalesce(col("country"), lit(""))).alias("country_code")
)

transformed_df.show(50, truncate = False)

+----------+--------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|expedia_id|check_in|check_out|policy                                                                                                                                                                                                                                                                                                                                                                                                                                                                    

In [20]:
transformed_df.printSchema()

root
 |-- expedia_id: string (nullable = false)
 |-- check_in: string (nullable = true)
 |-- check_out: string (nullable = true)
 |-- policy: struct (nullable = false)
 |    |-- pet_policy: string (nullable = true)
 |    |-- child_policy: string (nullable = true)
 |-- country_code: string (nullable = true)



In [24]:
transformed_df.count()

                                                                                

1000000

In [25]:
transformed_df=transformed_df.dropDuplicates(["expedia_id"])
transformed_df.count()

                                                                                

1000000

In [29]:

spark.sql(f"""
            CREATE TABLE IF NOT EXISTS local.default.lodging_policies (
                expedia_id STRING,
                check_in STRING,
                check_out STRING,
                policy struct<pet_policy: STRING, child_policy: STRING>,
                country_code  STRING,
                PRIMARY KEY (expedia_id)
            )
            USING iceberg
            PARTITIONED BY (country_code)
        """)

ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'expedia_id'.(line 8, pos 29)

== SQL ==

            CREATE TABLE IF NOT EXISTS local.default.lodging_policies (
                expedia_id STRING,
                check_in STRING,
                check_out STRING,
                policy struct<pet_policy: STRING, child_policy: STRING>,
                country_code  STRING,
                PRIMARY KEY (expedia_id)
-----------------------------^^^
            )
            USING iceberg
            PARTITIONED BY (country_code)
        


In [22]:
transformed_df.writeTo("local.db.lodging_policies") \
    .partitionedBy("country_code") \
    .using("iceberg") \
    .tableProperty("write.format.default", "parquet") \
    .tableProperty("write.parquet.compression-codec", "snappy") \
    .createOrReplace()

25/01/13 17:15:16 WARN HadoopTableOperations: Error reading version hint file warehouse/db/lodging_policies/metadata/version-hint.text
java.io.FileNotFoundException: File warehouse/db/lodging_policies/metadata/version-hint.text does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
	at org.apache.iceberg.hadoop.HadoopTableOperations.findVersion(HadoopTableOperations.java:318)
	at org.apache.iceberg.hadoop.HadoopTableOperations

In [12]:
spark.sql("SELECT count(*) as total_records, country_code FROM local.db.lodging_policies GROUP BY country_code ORDER BY total_records DESC").show(50)



+-------------+------------+
|total_records|country_code|
+-------------+------------+
|       317580|          US|
|       106879|          FR|
|        65310|          IT|
|        64297|          DE|
|        53694|          ES|
|        43225|          UK|
|        43080|          HR|
|        24046|          AU|
|        18076|          GR|
|        17312|          BR|
|        16421|          MX|
|        15844|          PT|
|        14035|          CA|
|        13100|          DK|
|        10969|          AT|
|         9582|          JP|
|         8275|          ID|
|         7761|          TH|
|         7752|          TR|
|         7171|          IN|
|         6497|          NZ|
|         6230|          NL|
|         6114|          CH|
|         5582|          SE|
|         4812|          PL|
|         3719|          DO|
|         3706|          KR|
|         3680|          VN|
|         3670|          ZA|
|         3651|          CR|
|         3539|          MA|
|         3496

                                                                                

In [13]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
+---------+

