## Connecting Databricks to Blob Storage

In [0]:
# Import the dbutils module from the Databricks runtime
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.dbutils import DBUtils

In [0]:
#Initialize a SparkSession if you haven't already done so. This is required to interact with Spark.
spark = SparkSession.builder \
    .appName("Blob Storage Mount") \
    .getOrCreate()

In [0]:
# Provide your Azure Blob Storage account credentials
account_name = "YOUR_ACCOUNT_NAME"
container_name = "YOUR_CONTAINER_NAME"
access_key = "YOUR_ACCESS_KEY"

In [0]:
# Mount Blob Storage container
dbutils.fs.mount(
  source = f"wasbs://{container_name}@{account_name}.blob.core.windows.net",
  mount_point = "/mnt/<mount_name>",
  extra_configs = {"fs.azure.account.key."+account_name+".blob.core.windows.net": access_key}
)
#Replace <mount_name> with the desired mount point name. This will be the directory where your Blob Storage container will be mounted in the Databricks file system

In [0]:
# Unmount Blob Storage container (optional)
dbutils.fs.unmount("/mnt/<mount_name>")

## Connecting Databricks to ADLS

In [0]:
# Define your ADLS Gen2 account details
adls_account_name = "YOUR_ADLS_ACCOUNT_NAME"
adls_container_name = "YOUR_CONTAINER_NAME"
adls_client_id = "YOUR_SERVICE_PRINCIPAL_CLIENT_ID"
adls_client_secret = "YOUR_SERVICE_PRINCIPAL_CLIENT_SECRET"
adls_tenant_id = "YOUR_TENANT_ID"

In [0]:
# Mount ADLS Gen2
dbutils.fs.mount(
  source = f"abfss://{adls_container_name}@{adls_account_name}.dfs.core.windows.net/",
  mount_point = "/mnt/<mount_name>",
  extra_configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": adls_client_id,
    "fs.azure.account.oauth2.client.secret": adls_client_secret,
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{adls_tenant_id}/oauth2/token"
  }
)


Replace the placeholders YOUR_ADLS_ACCOUNT_NAME, YOUR_CONTAINER_NAME, YOUR_SERVICE_PRINCIPAL_CLIENT_ID, YOUR_SERVICE_PRINCIPAL_CLIENT_SECRET, and YOUR_TENANT_ID with your actual ADLS Gen2 account details.

Make sure to provide the appropriate permissions to the service principal associated with the client ID and client secret. The service principal should have sufficient permissions to access the ADLS Gen2 account and the specified container.

Replace <mount_name> with the desired mount point name. This will be the directory where your ADLS Gen2 container will be mounted in the Databricks file system.

In [0]:
# Unmount ADLS Gen2 (optional)
dbutils.fs.unmount("/mnt/<mount_name>")

## Spark API for reading data

In [0]:
from pyspark.sql import SparkSession

In [0]:
df = spark.read.csv("path/to/csv/file")

In [0]:
df = spark.read.json("path/to/json/file")

In [0]:
df = spark.read.parquet("path/to/parquet/file")

In [0]:
df = spark.read.jdbc(url="jdbc:mysql://hostname:port/database", table="table_name")

In [0]:
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:port").option("subscribe", "topic").load()

In [0]:
df.printSchema()

In [0]:
df.show()

## Pandas API for reading data

In [0]:
import pandas as pd
df = pd.read_csv("file.csv")

In [0]:
import pandas as pd
df = pd.read_excel("file.xlsx", sheet_name="Sheet1")

In [0]:
import pandas as pd
df = pd.read_json("file.json")

In [0]:
import pandas as pd
df = pd.read_parquet("file.parquet")

In [0]:
import pandas as pd
df = pd.read_hdf("file.h5", key="data")

In [0]:
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('sqlite:///file.db')
df = pd.read_sql("SELECT * FROM table_name", engine)

## Writing Data

In [0]:
# Writing data to a relational database (e.g., Azure SQL Database)
jdbc_url = "jdbc:sqlserver://<host>:<port>;databaseName=<database>"
df.write.format("jdbc").option("url", jdbc_url).option("dbtable", "<table>").save()

# For Amazon RDS or Google Cloud SQL, replace the JDBC URL with the appropriate connection string.


In [0]:
# Writing data to a relational database using SQLAlchemy (assuming you have created a SQLAlchemy engine)
import sqlalchemy

engine = sqlalchemy.create_engine("dialect+driver://username:password@host:port/database_name")
df.to_sql(name='table_name', con=engine, if_exists='append', index=False)

In [0]:
# Writing data to Delta Lake
df.write.format("delta").save("/mnt/delta_path")

# Writing data to Apache Parquet format
df.write.format("parquet").save("/mnt/parquet_path")

# For Apache Hudi, additional configuration is required.

In [0]:
# Pandas does not have built-in support for writing directly to Delta Lake or Apache Hudi.
# However, you can write the DataFrame to Parquet format, which can be consumed by Delta Lake or Apache Hudi.
df.to_parquet("/local/path/to/output")

In [0]:
# Writing data to Apache Kafka
df.write.format("kafka").option("kafka.bootstrap.servers", "host:port").save()

# For AWS Kinesis or Azure Event Hubs, additional configuration is required.


## Data Processing Logic

In [0]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("DataAnalysis") \
    .getOrCreate()

# Read data from a CSV file
df = spark.read.csv("dbfs:/path/to/data.csv", header=True, inferSchema=True)

In [0]:
#DATA EXPLORATION
# Display the first few rows of the DataFrame
df.show()

# Print the schema of the DataFrame
df.printSchema()

# Display summary statistics of numerical columns
df.describe().show()

In [0]:
# DATA CLEANING
# Drop rows with missing values
df_cleaned = df.dropna()

# Fill missing values with a specified value
df_filled = df.fillna(0)

In [0]:
# DATA TRANSFORMATION AND MANIPULATION
# Filter data based on a condition
df_filtered = df.filter(df["age"] > 18)

# Select specific columns
df_selected = df.select("name", "age")

# Group data and compute aggregates
df_grouped = df.groupBy("gender").agg({"salary": "avg"})

In [0]:
# FEATURE ENGINEERING
from pyspark.sql.functions import when, col

# Create a new column based on a condition
df_with_new_column = df.withColumn("is_adult", when(col("age") >= 18, 1).otherwise(0))

In [0]:
# STASTITICAL ANALYSIS
from pyspark.ml.stat import Correlation

# Compute correlation matrix
correlation_matrix = Correlation.corr(df.select("age", "salary"), method="pearson").collect()[0][0]

In [0]:
# DATA VISUALIZATION 
import matplotlib.pyplot as plt

# Plot histogram of ages
ages = df.select("age").rdd.flatMap(lambda x: x).collect()
plt.hist(ages, bins=20)
plt.xlabel("Age")
plt.ylabel("Frequency")
plt.title("Distribution of Ages")
plt.show()

## Linking  Notebooks

In [0]:
# Master Notebook

# Run data extraction notebook
%run "/Workspace/extract_notebook" parameter1=value1 parameter2=value2

# Run data transformation notebook
%run "/Workspace/transform_notebook" parameter3=value3 parameter4=value4

# Run data loading notebook
%run "/Workspace/load_notebook" parameter5=value5 parameter6=value6