In [0]:

from pyspark.sql.functions import col, sum
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType

In [0]:
# Configure Azure storage access directly (no mount needed)
spark.conf.set("fs.azure.account.auth.type.crmanalysisdata.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.crmanalysisdata.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.crmanalysisdata.dfs.core.windows.net", "**key**")
spark.conf.set("fs.azure.account.oauth2.client.secret.crmanalysisdata.dfs.core.windows.net", "**key**")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.crmanalysisdata.dfs.core.windows.net", "https://login.microsoftonline.com/**key**/oauth2/token")

# Now access files directly using the abfss:// path
# Example: df = spark.read.csv("abfss://crm-analysis-data@crmanalysisdata.dfs.core.windows.net/your-file.csv***")
     

In [0]:
%fs
ls "abfss://crm-analysis-data@crmanalysisdata.dfs.core.windows.net"

path,name,size,modificationTime
abfss://crm-analysis-data@crmanalysisdata.dfs.core.windows.net/raw-data/,raw-data/,0,1770580373000
abfss://crm-analysis-data@crmanalysisdata.dfs.core.windows.net/transformed-data/,transformed-data/,0,1770580382000


In [0]:
spark

<pyspark.sql.connect.session.SparkSession at 0x7fa585d25e50>

In [0]:
accounts = spark.read.format("csv").option("header","true").option("inferSchema","true").load("abfss://crm-analysis-data@crmanalysisdata.dfs.core.windows.net/raw-data/accounts.csv")
products = spark.read.format("csv").option("header","true").option("inferSchema","true").load("abfss://crm-analysis-data@crmanalysisdata.dfs.core.windows.net/raw-data/products.csv")
salestransaction = spark.read.format("csv").option("header","true").option("inferSchema","true").load("abfss://crm-analysis-data@crmanalysisdata.dfs.core.windows.net/raw-data/salestransaction.csv")
salesteam = spark.read.format("csv").option("header","true").option("inferSchema","true").load("abfss://crm-analysis-data@crmanalysisdata.dfs.core.windows.net/raw-data/salesteam.csv")
     

In [0]:
accounts.show()

+----------------+-------------+----------------+-------+---------+---------------+----------------+
|         account|       sector|year_established|revenue|employees|office_location|   subsidiary_of|
+----------------+-------------+----------------+-------+---------+---------------+----------------+
|Acme Corporation|    technolgy|            1996|1100.04|     2822|  United States|            NULL|
|      Betasoloin|      medical|            1999| 251.41|      495|  United States|            NULL|
|        Betatech|      medical|            1986| 647.18|     1185|          Kenya|            NULL|
|      Bioholding|      medical|            2012| 587.34|     1356|     Philipines|            NULL|
|         Bioplex|      medical|            1991| 326.82|     1016|  United States|            NULL|
|        Blackzim|       retail|            2009| 497.11|     1588|  United States|            NULL|
|   Bluth Company|    technolgy|            1993|1242.32|     3027|  United States|Acme Cor

In [0]:
accounts.printSchema()

root
 |-- account: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- year_established: integer (nullable = true)
 |-- revenue: double (nullable = true)
 |-- employees: integer (nullable = true)
 |-- office_location: string (nullable = true)
 |-- subsidiary_of: string (nullable = true)



In [0]:
null_counts = accounts.select(
    *[sum(col(c).isNull().cast("int")).alias(c) for c in accounts.columns]
)

null_counts.show()

+-------+------+----------------+-------+---------+---------------+-------------+
|account|sector|year_established|revenue|employees|office_location|subsidiary_of|
+-------+------+----------------+-------+---------+---------------+-------------+
|      0|     0|               0|      0|        0|              0|           70|
+-------+------+----------------+-------+---------+---------------+-------------+



In [0]:
products.show()

+--------------+------+-----------+
|       product|series|sales_price|
+--------------+------+-----------+
|     GTX Basic|   GTX|        550|
|       GTX Pro|   GTX|       4821|
|    MG Special|    MG|         55|
|   MG Advanced|    MG|       3393|
|  GTX Plus Pro|   GTX|       5482|
|GTX Plus Basic|   GTX|       1096|
|       GTK 500|   GTK|      26768|
+--------------+------+-----------+



In [0]:
products.printSchema()

root
 |-- product: string (nullable = true)
 |-- series: string (nullable = true)
 |-- sales_price: integer (nullable = true)



In [0]:
salestransaction.show()

+--------------+-----------------+--------------+--------------------+----------+-----------+----------+-----------+
|opportunity_id|      sales_agent|       product|             account|deal_stage|engage_date|close_date|close_value|
+--------------+-----------------+--------------+--------------------+----------+-----------+----------+-----------+
|      1C1I7A6R|      Moses Frase|GTX Plus Basic|             Cancity|       Won| 2016-10-20|2017-03-01|       1054|
|      Z063OYW0|  Darcel Schlecht|        GTXPro|               Isdom|       Won| 2016-10-25|2017-03-11|       4514|
|      EC4QE1BX|  Darcel Schlecht|    MG Special|             Cancity|       Won| 2016-10-25|2017-03-07|         50|
|      MV1LWRNH|      Moses Frase|     GTX Basic|             Codehow|       Won| 2016-10-25|2017-03-09|        588|
|      PE84CX4O|        Zane Levy|     GTX Basic|              Hatfan|       Won| 2016-10-25|2017-03-02|        517|
|      ZNBS69V1|    Anna Snelling|    MG Special|            Ron

In [0]:
salestransaction.printSchema()

root
 |-- opportunity_id: string (nullable = true)
 |-- sales_agent: string (nullable = true)
 |-- product: string (nullable = true)
 |-- account: string (nullable = true)
 |-- deal_stage: string (nullable = true)
 |-- engage_date: date (nullable = true)
 |-- close_date: date (nullable = true)
 |-- close_value: integer (nullable = true)



In [0]:
null_counts = salestransaction.select(
    *[sum(col(c).isNull().cast("int")).alias(c) for c in salestransaction.columns]
)

null_counts.show()

+--------------+-----------+-------+-------+----------+-----------+----------+-----------+
|opportunity_id|sales_agent|product|account|deal_stage|engage_date|close_date|close_value|
+--------------+-----------+-------+-------+----------+-----------+----------+-----------+
|             0|          0|      0|   1425|         0|        500|      2089|       2089|
+--------------+-----------+-------+-------+----------+-----------+----------+-----------+



In [0]:
salestransaction_clean = salestransaction.na.drop(subset=["account", "close_value"])
salestransaction_clean.count(), salestransaction.count()

(6711, 8800)

In [0]:
null_counts = salestransaction_clean.select(
    *[sum(col(c).isNull().cast("int")).alias(c) for c in salestransaction_clean.columns]
)

null_counts.show()

+--------------+-----------+-------+-------+----------+-----------+----------+-----------+
|opportunity_id|sales_agent|product|account|deal_stage|engage_date|close_date|close_value|
+--------------+-----------+-------+-------+----------+-----------+----------+-----------+
|             0|          0|      0|      0|         0|          0|         0|          0|
+--------------+-----------+-------+-------+----------+-----------+----------+-----------+



In [0]:
salesteam.show()

+------------------+----------------+---------------+
|       sales_agent|         manager|regional_office|
+------------------+----------------+---------------+
|     Anna Snelling|Dustin Brinkmann|        Central|
|    Cecily Lampkin|Dustin Brinkmann|        Central|
| Versie Hillebrand|Dustin Brinkmann|        Central|
|   Lajuana Vencill|Dustin Brinkmann|        Central|
|       Moses Frase|Dustin Brinkmann|        Central|
|Jonathan Berthelot|   Melvin Marxen|        Central|
| Marty Freudenburg|   Melvin Marxen|        Central|
|  Gladys Colclough|   Melvin Marxen|        Central|
|   Niesha Huffines|   Melvin Marxen|        Central|
|   Darcel Schlecht|   Melvin Marxen|        Central|
|     Mei-Mei Johns|   Melvin Marxen|        Central|
|  Violet Mclelland|      Cara Losch|           East|
|     Corliss Cosme|      Cara Losch|           East|
|Rosie Papadopoulos|      Cara Losch|           East|
|     Garret Kinder|      Cara Losch|           East|
|    Wilburn Farren|      Ca

In [0]:
salesteam.printSchema()

root
 |-- sales_agent: string (nullable = true)
 |-- manager: string (nullable = true)
 |-- regional_office: string (nullable = true)



In [0]:
null_counts = salesteam.select(
    *[sum(col(c).isNull().cast("int")).alias(c) for c in salesteam.columns]
)

null_counts.show()

+-----------+-------+---------------+
|sales_agent|manager|regional_office|
+-----------+-------+---------------+
|          0|      0|              0|
+-----------+-------+---------------+



In [0]:
accounts.repartition(1).write.mode("overwrite").option("header",'true').csv("abfss://crm-analysis-data@crmanalysisdata.dfs.core.windows.net/transformed-data/accounts")
products.repartition(1).write.mode("overwrite").option("header",'true').csv("abfss://crm-analysis-data@crmanalysisdata.dfs.core.windows.net/transformed-data/products")
salestransaction_clean.repartition(1).write.mode("overwrite").option("header",'true').csv("abfss://crm-analysis-data@crmanalysisdata.dfs.core.windows.net/transformed-data/salestransaction")
salesteam.repartition(1).write.mode("overwrite").option("header",'true').csv("abfss://crm-analysis-data@crmanalysisdata.dfs.core.windows.net/transformed-data/salesteam")
