In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Reading and Parsing JSON Files/Data")
    .master("local[*]")
    .getOrCreate()
)

spark

25/01/11 23:23:34 WARN Utils: Your hostname, Aadityas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.128 instead (on interface en0)
25/01/11 23:23:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/11 23:23:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/11 23:23:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/01/11 23:23:35 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/01/11 23:23:35 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/01/11 23:23:35 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.


25/01/11 23:23:50 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [2]:
df_single = spark.read.format("json").load("datasets/order_singleline.json")

In [4]:
df_single.printSchema()

root
 |-- contact: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_line_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: double (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- qty: long (nullable = true)



In [5]:
# contact vitra array xa array ko element ko dtatype long xa
# order line items ni array xa 
#array vitra structure or simply dictionary xa tesko key amount ma double data type xa
# key item_id string xa
# key qty chai long xa

In [6]:
df_single.show()


+--------------------+-----------+--------+--------------------+
|             contact|customer_id|order_id|    order_line_items|
+--------------------+-----------+--------+--------------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|
+--------------------+-----------+--------+--------------------+



In [7]:
# tyo .... hataunu paryo vanae show garda truncate=False gardini
df_single.show(truncate=False)

+------------------------+-----------+--------+------------------------------------+
|contact                 |customer_id|order_id|order_line_items                    |
+------------------------+-----------+--------+------------------------------------+
|[9000010000, 9000010001]|C001       |O101    |[{102.45, I001, 6}, {2.01, I003, 2}]|
+------------------------+-----------+--------+------------------------------------+



In [9]:
#using schema to read only three columns
_schema = "customer_id string, order_id string, contact array<long>"

df_schema = spark.read.format("json").schema(_schema).load("datasets/order_singleline.json")

In [11]:
df_schema.show()

+-----------+--------+--------------------+
|customer_id|order_id|             contact|
+-----------+--------+--------------------+
|       C001|    O101|[9000010000, 9000...|
+-----------+--------+--------------------+



In [13]:
#complex schema
_schema = "contact array<string>, customer_id string, order_id string, order_line_items array<struct<amount double, item_id string, qty long>>"
#note yesko column name ra json ko key same chai rakhni

In [14]:
df_schema_new = spark.read.format("json").schema(_schema).load("datasets/order_singleline.json")


In [16]:
df_schema_new.printSchema()

root
 |-- contact: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_line_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: double (nullable = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- qty: long (nullable = true)



# imp
### **Notes on `explode` and `.*` in PySpark**

---

### **`explode`**
`explode` is used to transform a column containing arrays or dictionaries into multiple rows. It is particularly useful when dealing with nested or complex data structures.

#### **Key Points:**
1. **Arrays:**
   - Each element in the array becomes a new row.
2. **Dictionaries:**
   - Each key-value pair becomes a new row with the key and value split into separate columns.
3. **Null or Empty Data:**
   - Null or empty arrays and dictionaries are skipped during the explosion.

#### **Syntax:**
```python
from pyspark.sql.functions import explode

# For arrays
df_exploded = df.select("column_name", explode("array_column").alias("exploded_value"))

# For dictionaries
df_exploded = df.select("column_name", explode("map_column").alias("key", "value"))
```

#### **Examples:**

1. **Array Example:**
   ```python
   data = [("John", [1, 2, 3]), ("Jane", [4, 5]), ("Doe", [])]
   df = spark.createDataFrame(data, ["name", "numbers"])
   df_exploded = df.select("name", explode("numbers").alias("number"))
   ```
   **Result:**
   | name | number |
   |------|--------|
   | John | 1      |
   | John | 2      |
   | John | 3      |
   | Jane | 4      |
   | Jane | 5      |

2. **Dictionary Example:**
   ```python
   data = [("John", {"a": 1, "b": 2}), ("Jane", {"x": 10, "y": 20}), ("Doe", None)]
   df = spark.createDataFrame(data, ["name", "attributes"])
   df_exploded = df.select("name", explode("attributes").alias("key", "value"))
   ```
   **Result:**
   | name | key | value |
   |------|-----|-------|
   | John | a   | 1     |
   | John | b   | 2     |
   | Jane | x   | 10    |
   | Jane | y   | 20    |

---

### **`.*`**
`.*` is a shorthand used in PySpark to select all the fields within a struct column and bring them to the top level as individual columns.

#### **Key Points:**
1. **Used with Struct Columns:**
   - Extracts all fields of a struct into separate columns.
2. **Simplifies Column Selection:**
   - No need to manually select each field of a struct column.
3. **Works Only with Structs:**
   - If applied to a non-struct column, it will raise an error.

#### **Syntax:**
```python
df_selected = df.select("column_name", "struct_column.*")
```

#### **Example Using `.*`:**

Instead of manually specifying each field like this:
```python
df.select(
    "contact",
    "customer_id",
    "order_id",
    df.expanded_line_items.item_name,
    df.expanded_line_items.quantity,
    df.expanded_line_items.price
)
```

You can write:
```python
df.select(
    "contact",
    "customer_id",
    "order_id",
    "expanded_line_items.*"
)
```

**Result:**
| contact | customer_id | order_id | item_name | quantity | price |
|---------|-------------|----------|-----------|----------|-------|
| ...     | ...         | ...      | ...       | ...      | ...   |

---

### **Comparison of `explode` and `.*`**
| Feature                  | `explode`                         | `.*`                          |
|--------------------------|------------------------------------|-------------------------------|
| **Use Case**             | Flatten arrays or dictionaries    | Extract fields from structs   |
| **Output**               | Multiple rows                    | Multiple columns              |
| **Applies To**           | Arrays, dictionaries              | Structs                       |
| **Behavior**             | Creates a row for each element    | Expands fields into columns   |

Let me know if you need clarifications or further examples! 😊

In [18]:
#so explode le pratyek element ko lagi yeuta yeuta row banauxa
#.* le chai dictionary ko key le column ko name ra value le chai value banauxa

In [19]:
df_one = spark.read.format("json").schema(_schema).load("datasets/order_singleline.json")

In [20]:
df_one.show()

+--------------------+-----------+--------+--------------------+
|             contact|customer_id|order_id|    order_line_items|
+--------------------+-----------+--------+--------------------+
|[9000010000, 9000...|       C001|    O101|[{102.45, I001, 6...|
+--------------------+-----------+--------+--------------------+



In [27]:
#contact lai 2 ta row banai dim
from pyspark.sql.functions import explode

df_two=df_one.select(explode(df_one["contact"]).alias("number"),df_one["customer_id"],df_one["order_id"],df_one["order_line_items"])

In [28]:
df_two.show()

+----------+-----------+--------+--------------------+
|    number|customer_id|order_id|    order_line_items|
+----------+-----------+--------+--------------------+
|9000010000|       C001|    O101|[{102.45, I001, 6...|
|9000010001|       C001|    O101|[{102.45, I001, 6...|
+----------+-----------+--------+--------------------+



In [30]:
#order_line_items lai ni ta explode garnu paryo ni
df_three=df_two.select(df_two["customer_id"],df_two["order_id"],df_two["number"],explode(df_two["order_line_items"]).alias("order_line_items"))

In [31]:
df_three.show()

+-----------+--------+----------+-----------------+
|customer_id|order_id|    number| order_line_items|
+-----------+--------+----------+-----------------+
|       C001|    O101|9000010000|{102.45, I001, 6}|
|       C001|    O101|9000010000|  {2.01, I003, 2}|
|       C001|    O101|9000010001|{102.45, I001, 6}|
|       C001|    O101|9000010001|  {2.01, I003, 2}|
+-----------+--------+----------+-----------------+



In [38]:
# so aaru value same rakhae ra number chai 2 ta vako xa
#aaba tyo dictionary lai chai expand gardim
tf_four=df_three.select(df_three["number"],df_three["customer_id"],df_three["order_id"],df_three["order_line_items.qty"])#aaba ta yeuta dictionary matrai xa so dictionaryname.key garni ,python ma chai name["key"] garinxa tara yesma garda chai name.key garinxa
#kun kun chaiya ho data json file ma herdai key indexing garni

In [39]:
tf_four.show()

+----------+-----------+--------+---+
|    number|customer_id|order_id|qty|
+----------+-----------+--------+---+
|9000010000|       C001|    O101|  6|
|9000010000|       C001|    O101|  2|
|9000010001|       C001|    O101|  6|
|9000010001|       C001|    O101|  2|
+----------+-----------+--------+---+



In [41]:
#aaba sabbai garnu pare 
tf_five=df_three.select(df_three["number"],df_three["customer_id"],df_three["order_id"],"order_line_items.*")#.* garna lai chai dataframe[s.*] yesto na garni matrai s.* garni


In [42]:
tf_five.show()

+----------+-----------+--------+------+-------+---+
|    number|customer_id|order_id|amount|item_id|qty|
+----------+-----------+--------+------+-------+---+
|9000010000|       C001|    O101|102.45|   I001|  6|
|9000010000|       C001|    O101|  2.01|   I003|  2|
|9000010001|       C001|    O101|102.45|   I001|  6|
|9000010001|       C001|    O101|  2.01|   I003|  2|
+----------+-----------+--------+------+-------+---+

