### Download dataset and upload to lakehouse

Connect to Azure Open Datasets Container and load the Predictive Maintenance dataset. This code downloads a publicly available version of the dataset and then stores it in a Fabric lakehouse.

> [!IMPORTANT]
> [Add a lakehouse](https://aka.ms/fabric/addlakehouse) to the notebook before running it. **Failure to do so results in an error.**

<h5>This code downloads demo data files into a lakehouse directory if they don't already exist.</h5>
<hr>

**Variables:**

<p><i>DATA_FOLDER:</i> Name of the folder containing the dataset.<br>
<i>DATA_FILE:</i> Name of the data file to be downloaded.<br>
<i>remote_url:</i> URL where the data files are hosted.<br>
<i>file_list:</i> List of files to be downloaded.<br>
<i>download_path:</i> Path to download files into.</P>

**File Download Process:**

<p>It checks if the default lakehouse directory exists. If not, it raises an error.<br>
It creates the necessary directory structure within the lakehouse.<br>
Iterates through the list of files to be downloaded.<br>
For each file, it checks if it exists in the designated download path. If not, it downloads the file from the remote URL and writes it into the appropriate directory.</p>

In [1]:

# Importing necessary libraries
import os,requests

# Setting up folder and file names
DATA_FOLDER = "Files/OpenFoodFactsZip/"  # Folder containing the dataset
DATA_FILE = "en.openfoodfacts.org.products.csv.gz"  # Data file name

# Remote URL where data files are hosted
remote_url = "https://static.openfoodfacts.org/data/"

# List of files to be downloaded
file_list = ["en.openfoodfacts.org.products.csv.gz"]

# Path to download files into
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

# Checking if default lakehouse exists, if not, raise an error
if not os.path.exists("/lakehouse/default"):
    raise FileNotFoundError(
        "Default lakehouse not found, please add a lakehouse and restart the session."
    )

# Creating the directory structure if it doesn't exist
os.makedirs(download_path, exist_ok=True)

# Iterating through the file list and downloading files if they don't exist
for fname in file_list:
    if not os.path.exists(f"{download_path}/{fname}"):
        # Fetching the file from the remote URL
        r = requests.get(f"{remote_url}/{fname}", timeout=30)
        # Writing the downloaded content into the appropriate directory
        with open(f"{download_path}/{fname}", "wb") as f:
            f.write(r.content)

# Informing the user that files have been downloaded
print("Downloaded demo data files into lakehouse.")

StatementMeta(, 6abce8d0-523c-4061-8c62-14d9deb51a8d, 3, Finished, Available)

Downloaded demo data files into lakehouse.


<h5>This code reads data from a CSV file into a Spark DataFrame using the provided options, and then displays the first 5 rows of the DataFrame.</h5>
<hr>

<b>DataFrame Creation:</b>

<p>The spark.read method is used to read data into a DataFrame.<br>
Various options such as header, sep (separator), and inferSchema are specified to properly read the CSV file.<br>
The csv method is used to read the CSV file, with the file path specified by <i>f"{DATA_FOLDER}raw/en.openfoodfacts.org.products.csv.gz"</i>.<br>
The cache method is used to cache the DataFrame in memory for faster access.

<b>Displaying DataFrame:</b>

The show method is used to display the first 5 rows of the DataFrame.

In [4]:
# Reading data into a Spark DataFrame
df = (
    spark.read.option("header", True)  
    .option("sep", '\t')  # Specifying the separator as tab
    .option("inferSchema", True)  # Inferring the schema
    .csv(f"{DATA_FOLDER}raw/en.openfoodfacts.org.products.csv.gz")  # Reading the CSV file
    .cache()  # Caching the DataFrame for better performance
)

# Displaying the first 5 rows of the DataFrame
df.show(5)

StatementMeta(, 6abce8d0-523c-4061-8c62-14d9deb51a8d, 6, Finished, Available)

+------------+--------------------+--------------+----------+-------------------+---------------+----------------------+----------------+--------------+---------------------+----------------+------------------------+------------+--------+---------+--------------+------------+--------------+-------+-----------+--------------------+--------------------+--------------------+-------+------------+----------+--------------------+-------------------------+-----------+------------+---------+---------+--------------+------------------------+------+-----------+---------------+------+---------+--------------+------------+--------------------+--------------------+-------------------------+---------+------------+------+-----------+---------+------------+----------------+-----------------+-----------+---------+--------------+------------+----------------+----------------+----------+--------------------+--------------+-----------------+--------------------+--------------------+--------------------+--

<h5>This code replaces spaces in the column names of the DataFrame with underscores to avoid invalid characters while saving.</h5>
<hr>

<b>Column Name Replacement:</b>

<p>It iterates over each column name in the DataFrame using a generator expression <i>(c.replace(' ', '_') for c in df.columns)</i>.<br>
For each column name, it replaces spaces with underscores using the replace method.<br>
The resulting column names are used to create a new DataFrame with updated column names.</p>

<b>Table Name Definition:</b>

The variable table_name is assigned the value <i>products_zip</i> to define the name of the table where the DataFrame will be saved.

<b>Displaying DataFrame:</b>

The show method is used to display the first 5 rows of the DataFrame after updating the column names.

In [5]:
# Replacing spaces in column names with underscores to avoid invalid characters while saving
df = df.toDF(*(c.replace(' ', '_') for c in df.columns))

# Defining the table name
table_name = "products_zip"

# Displaying the first 5 rows of the DataFrame with updated column names
df.show(5)

StatementMeta(, 6abce8d0-523c-4061-8c62-14d9deb51a8d, 7, Finished, Available)

+------------+--------------------+--------------+----------+-------------------+---------------+----------------------+----------------+--------------+---------------------+----------------+------------------------+------------+--------+---------+--------------+------------+--------------+-------+-----------+--------------------+--------------------+--------------------+-------+------------+----------+--------------------+-------------------------+-----------+------------+---------+---------+--------------+------------------------+------+-----------+---------------+------+---------+--------------+------------+--------------------+--------------------+-------------------------+---------+------------+------+-----------+---------+------------+----------------+-----------------+-----------+---------+--------------+------------+----------------+----------------+----------+--------------------+--------------+-----------------+--------------------+--------------------+--------------------+--

<h5>This code saves the DataFrame with processed columns to the lakehouse in the Delta format.</h5>
<hr>

<b>DataFrame Saving:</b>

<p>The write method is used to specify the write mode as <i>overwrite</i>, which means any existing data with the same table name will be replaced.<br>
The format for saving the data is specified as <i>delta</i>.<br>
The data is saved to the path specified by <i>f"Tables/{table_name}"</i> .</p>

<b>Confirmation:</b>

After saving the DataFrame, a confirmation message is printed indicating that the Spark DataFrame has been saved to the Delta table named <i>products_zip</i>.

In [6]:
# Saving the DataFrame with processed columns to the lakehouse
df.write.mode("overwrite").format("delta").save(f"Tables/{table_name}")

# Printing a confirmation message
print(f"Spark dataframe saved to delta table: {table_name}")

StatementMeta(, 6abce8d0-523c-4061-8c62-14d9deb51a8d, 8, Finished, Available)

Spark dataframe saved to delta table: products_zip
