## Mount Data

In [None]:
storageAccountName = 'primumstorage01'
containerName = 'bd-project'
applicationId = 'applicationId'
directoryID = 'directoryID'
secretValue = 'secretValue'
endpoint = 'https://login.microsoftonline.com/' + directoryID + '/oauth2/token'
source = 'abfss://' + containerName + '@' + storageAccountName + '.dfs.core.windows.net/'
mountPoint = "/mnt/my_data"

configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": applicationId,
           "fs.azure.account.oauth2.client.secret": secretValue,
           "fs.azure.account.oauth2.client.endpoint": endpoint}


dbutils.fs.mount(source = source,mount_point = mountPoint, extra_configs = configs)

True

In [None]:
dbutils.fs.unmount("/mnt/my_data")

/mnt/my_data has been unmounted.


True

In [None]:
display(
  dbutils.fs.ls("/mnt/my_data/Users")
       )

path,name,size,modificationTime
"dbfs:/mnt/my_data/Users/""raw_st"".""users"".csv","""raw_st"".""users"".csv",242846485,1715171646000


In [None]:
 dbutils.fs.mounts()

[MountInfo(mountPoint='/databricks-datasets', source='databricks-datasets', encryptionType=''),
 MountInfo(mountPoint='/Volumes', source='UnityCatalogVolumes', encryptionType=''),
 MountInfo(mountPoint='/mnt/my_data', source='abfss://bd-project@primumstorage01.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-tracking', source='databricks/mlflow-tracking', encryptionType=''),
 MountInfo(mountPoint='/databricks-results', source='databricks-results', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-registry', source='databricks/mlflow-registry', encryptionType=''),
 MountInfo(mountPoint='/Volume', source='DbfsReserved', encryptionType=''),
 MountInfo(mountPoint='/volumes', source='DbfsReserved', encryptionType=''),
 MountInfo(mountPoint='/', source='DatabricksRoot', encryptionType=''),
 MountInfo(mountPoint='/volume', source='DbfsReserved', encryptionType='')]

In [None]:
# Verify if the data is mounted correctly

expected_source = 'abfss://' + containerName + '@' + storageAccountName + '.dfs.core.windows.net/'

if source == expected_source:
    print("Data is mounted correctly")
else:
    print("Data is not mounted correctly")

##  Machine Learning


In [None]:
# Creating a spark session
from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .appName("Table Loading")
         .getOrCreate())

sc = spark.sparkContext

In [None]:
file_location = "/mnt/my_data/ml_training/Posts/*"

posts = spark.read \
  .parquet(file_location)

# display(posts)

In [None]:
type(posts)

pyspark.sql.dataframe.DataFrame

In [None]:
# Creating the schema for posttypes table
from pyspark.sql.types import *

PT_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("Type", StringType(), True)
])
display(PT_schema)

StructType([StructField('id', IntegerType(), True), StructField('Type', StringType(), True)])

In [None]:
# Creating the posttypes dataframe
file_location = "/mnt/my_data/ml_training/PostTypes.txt"

postType = (spark.read
  .option("header", "true")
  .option("sep", ",")
  .schema(PT_schema)
  .csv(file_location))

display(postType)
type(postType)

In [None]:
# Creating the schema for the users table
from pyspark.sql.types import *

users_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("Age", IntegerType(), True),
    StructField("CreationDate", DateType(), True),
    StructField("DisplayName", StringType(), True),
    StructField("DownVotes", IntegerType(), True),
    StructField("EmailHash", StringType(), True),
    StructField("Location", StringType(), True),
    StructField("Reputation", IntegerType(), True),
    StructField("UpVotes", IntegerType(), True),
    StructField("Views", IntegerType(), True),
    StructField("WebsiteUrl", StringType(), True),
    StructField("AccountId", IntegerType(), True)
])

display(users_schema)



StructType([StructField('id', IntegerType(), True), StructField('Age', IntegerType(), True), StructField('CreationDate', DateType(), True), StructField('DisplayName', StringType(), True), StructField('DownVotes', IntegerType(), True), StructField('EmailHash', StringType(), True), StructField('Location', StringType(), True), StructField('Reputation', IntegerType(), True), StructField('UpVotes', IntegerType(), True), StructField('Views', IntegerType(), True), StructField('WebsiteUrl', StringType(), True), StructField('AccountId', IntegerType(), True)])

In [None]:
# Creating the users dataframe
file_location = "/mnt/my_data/ml_training/users.csv"

users = (spark.read
  .option("header", "true")
  .option("sep", ",")
  .schema(users_schema)
  .csv(file_location))

display(users)

In [None]:
# Save the 3 tables to databricks local file system
posts.write.parquet("/tmp/project/posts.parquet")
postType.write.parquet("/tmp/project/PostType.parquet")
users.write.parquet("/tmp/project/user.parquet")

In [None]:
# review the local file system
display(dbutils.fs.ls("/tmp/project/"))

path,name,size,modificationTime
dbfs:/tmp/project/PostType.parquet/,PostType.parquet/,0,1715527670000
dbfs:/tmp/project/posts.parquet/,posts.parquet/,0,1715527666000
dbfs:/tmp/project/user.parquet/,user.parquet/,0,1715527671000


In [None]:
# Import necessary libraries and functions
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, translate, trim, explode, regexp_replace, col, lower

In [None]:
# Creating Spark Session
spark = (SparkSession
         .builder
         .appName("ML Model")
         .getOrCreate())

sc = spark.sparkContext

In [None]:
posts = spark.read.parquet("/tmp/project/posts.parquet")
postType = spark.read.parquet("/tmp/project/PostType.parquet")
Users = spark.read.parquet("/tmp/project/user.parquet")

In [None]:
# at this moment, we only use Posts and posttypes to train the model. so let's join them iwith the posttype id. 

df= posts.join(postType, posts.PostTypeId == postType.id)
display(df)


In [None]:
# Filter the dataframe to only include questions
df= df.filter(col("Type") == "Question")
# display(df1_1)

In [None]:
df = (df.withColumn('Body', regexp_replace(df.Body, r'<.*?>', '')) # Transforming HTML code to strings
      .withColumn("Tags", split(trim(translate(col("Tags"), "<>", " ")), " ")) # Making a list of the tags
)

# display(df)

In [None]:
# Filter the dataframe to only include questions
df = df.filter(col("Type") == "Question")
# display(df)


In [None]:
df = df.select(col("Body").alias("text"), col("Tags"))


In [None]:
# Producing the tags as individual tags instead of an array
# This is duplicating the posts for each possible tag
df = df.select("text", explode("Tags").alias("tags"))
# display(df)

---------------------------------- Now to clean text column from DataFrame we created----------------------------------------

In [None]:
# saving the file as a checkpoint (in case the cluster gets terminated)

df.write.parquet("/tmp/project.df.parquet")

In [None]:
# Saving the dataframe to memory for repetitive use
df.cache()
df.count()

2697

In [None]:
# Preprocessing the data
cleaned =         df.withColumn('text', regexp_replace('text', r"http\S+", "")) \
                    .withColumn('text', regexp_replace('text', r"[^a-zA-z]", " ")) \
                    .withColumn('text', regexp_replace('text', r"\s+", " ")) \
                    .withColumn('text', lower('text')) \
                    .withColumn('text', trim('text')) 
# display(cleaned)

In [None]:
display(df) # to understand what happend

In [None]:
display(cleaned)

In [None]:
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
tokenized = tokenizer.transform(cleaned)

# display(tokenized)

In [None]:
from pyspark.ml.feature import StopWordsRemover

stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
stopword = stopword_remover.transform(tokenized)

# display(stopword)

In [None]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
cv_model = cv.fit(stopword)
text_cv = cv_model.transform(stopword)

# display(text_cv)

In [None]:
from pyspark.ml.feature import HashingTF, IDF

idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
idf_model = idf.fit(text_cv)
text_idf = idf_model.transform(text_cv)

# display(text_idf)

In [None]:
display(text_idf)

###  Note: i had a dataframe and every time i put a new column, depend it on previous column, but with clean that column

------------------------------------ i had cleaned the text column and now clean the tag column-------------------------------

In [None]:
from pyspark.ml.feature import StringIndexer

label_encoder = StringIndexer(inputCol = "tags", outputCol = "label")
le_model = label_encoder.fit(text_idf)
final = le_model.transform(text_idf)

# display(final)

In [None]:
display(final)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-755696554403870>, line 1[0m
[0;32m----> 1[0m display([43mfinal[49m)

[0;31mNameError[0m: name 'final' is not defined

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=100)

lr_model = lr.fit(final)

predictions = lr_model.transform(final)

# display(predictions)


Downloading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

In [None]:
display(predictions)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
roc_auc = evaluator.evaluate(predictions)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

Accuracy Score: 0.3482
ROC-AUC: 0.2812


------------------------------------------------ALL Codes in one Cell-------------------------------------------------------

### Importing all the libraries
from pyspark.sql.functions import split, translate, trim, explode, regexp_replace, col, lower
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## Preparing the data
### Step 1: Creating the joined table
df = posts.join(postType, posts.PostTypeId == postType.id)
### Step 2: Selecting only Question posts
df = df.filter(col("Type") == "Question")
### Step 3: Formatting the raw data
df = (df.withColumn('Body', regexp_replace(df.Body, r'<.*?>', ''))
      .withColumn("Tags", split(trim(translate(col("Tags"), "<>", " ")), " "))
)
### Step 4: Selecting the columns
df = df.select(col("Body").alias("text"), col("Tags"))
### Step 5: Getting the tags
df = df.select("text", explode("Tags").alias("tags"))
### Step 6: Clean the text
cleaned = df.withColumn('text', regexp_replace('text', r"http\S+", "")) \
                    .withColumn('text', regexp_replace('text', r"[^a-zA-z]", " ")) \
                    .withColumn('text', regexp_replace('text', r"\s+", " ")) \
                    .withColumn('text', lower('text')) \
                    .withColumn('text', trim('text')) 

## Machine Learning
### Step 1: Train Test Split
train, test = cleaned.randomSplit([0.9, 0.1], seed=20200819)
### Step 2: Initializing the transfomers
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)
label_encoder = StringIndexer(inputCol = "tags", outputCol = "label")
lr = LogisticRegression(maxIter=100)
### Step 3: Creating the pipeline
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, label_encoder, lr])
### Step 4: Fitting and transforming (predicting) using the pipeline
pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)
------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Importing all the libraries
from pyspark.sql.functions import split, translate, trim, explode, regexp_replace, col, lower
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Preparing the data
# Step 1: Creating the joined table
df = posts.join(postType, posts.PostTypeId == postType.id)
# Step 2: Selecting only Question posts
df = df.filter(col("Type") == "Question")
# Step 3: Formatting the raw data
df = (df.withColumn('Body', regexp_replace(df.Body, r'<.*?>', ''))
      .withColumn("Tags", split(trim(translate(col("Tags"), "<>", " ")), " "))
)
# Step 4: Selecting the columns
df = df.select(col("Body").alias("text"), col("Tags"))
# Step 5: Getting the tags
df = df.select("text", explode("Tags").alias("tags"))
# Step 6: Clean the text
cleaned = df.withColumn('text', regexp_replace('text', r"http\S+", "")) \
                    .withColumn('text', regexp_replace('text', r"[^a-zA-z]", " ")) \
                    .withColumn('text', regexp_replace('text', r"\s+", " ")) \
                    .withColumn('text', lower('text')) \
                    .withColumn('text', trim('text')) 

# Machine Learning
# Step 1: Train Test Split
train, test = cleaned.randomSplit([0.9, 0.1], seed=20200819)
# Step 2: Initializing the transfomers
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)
label_encoder = StringIndexer(inputCol = "tags", outputCol = "label")
lr = LogisticRegression(maxIter=100)
# Step 3: Creating the pipeline
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, label_encoder, lr])
# Step 4: Fitting and transforming (predicting) using the pipeline
pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)

Downloading artifacts:   0%|          | 0/34 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

In [None]:
# Saving model object to the /mnt/deBDProject directory. Yours name may be different.
pipeline_model.save('/mnt/my_data/model')

# Save the the String Indexer to decode the encoding. We need it in the future Sentiment Analysis.
le_model.save('/mnt/my_data/stringindexer')

In [None]:
display(dbutils.fs.ls("/mnt/my_data/model"))
display(dbutils.fs.ls("/mnt/my_data/stringindexer"))

path,name,size,modificationTime
dbfs:/mnt/my_data/model/metadata/,metadata/,0,1715590248000
dbfs:/mnt/my_data/model/stages/,stages/,0,1715590250000


path,name,size,modificationTime
dbfs:/mnt/my_data/stringindexer/data/,data/,0,1715590292000
dbfs:/mnt/my_data/stringindexer/metadata/,metadata/,0,1715590289000
