In [1]:
from google.cloud import storage

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, TimestampType, StringType


schema = StructType([
    StructField("random1", StringType(), True),        
    StructField("random2", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("random4", StringType(), True),
    StructField("random5", StringType(), True),
    StructField("label", DoubleType(), False),
    StructField("reviews", StringType(), True),
    StructField("random8", StringType(), True),
    StructField("random9", StringType(), True),
])

spark = SparkSession.builder.\
        master("local[*]").\
        appName("processing").\
        getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/24 07:14:08 INFO SparkEnv: Registering MapOutputTracker
24/08/24 07:14:08 INFO SparkEnv: Registering BlockManagerMaster
24/08/24 07:14:08 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/08/24 07:14:09 INFO SparkEnv: Registering OutputCommitCoordinator


In [3]:
# myTable = spark.read.format("csv").schema(schema).load("gs://pyq/training")
myTable = spark.read.format("csv").schema(schema).load("gs://pyq/output.csv")


In [4]:
from pyspark.sql import functions as F

# Create a DataFrame with the count of null values for each column
null_counts = myTable.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in myTable.columns])

# Show the result
null_counts.show()

                                                                                

+-------+-------+---------+-------+-------+-----+-------+-------+-------+
|random1|random2|timestamp|random4|random5|label|reviews|random8|random9|
+-------+-------+---------+-------+-------+-----+-------+-------+-------+
|      0|    283|      928|    649|    768|  911|    868|   1202|   1213|
+-------+-------+---------+-------+-------+-----+-------+-------+-------+



In [5]:
myTable.show(1)

+--------------------+-------+-------------------+-------+--------------------+-----+--------------------+-------+--------------------+
|             random1|random2|          timestamp|random4|             random5|label|             reviews|random8|             random9|
+--------------------+-------+-------------------+-------+--------------------+-----+--------------------+-------+--------------------+
|--FcbSxK1AoEtEAxO...|      0|2017-10-09 18:40:02|      0|-zF5IYw28q-ZOj8lh...|  5.0|Great place to go...|      0|dweQtsAEjtkpQQ_Tv...|
+--------------------+-------+-------------------+-------+--------------------+-----+--------------------+-------+--------------------+
only showing top 1 row



In [6]:
columns_to_drop = ['random1', 'random2', 'random4', 'random5', 'random8', 'random9', 'random10']
myTable = myTable.drop(*columns_to_drop)

In [7]:
myTable.show(1)

+-------------------+-----+--------------------+
|          timestamp|label|             reviews|
+-------------------+-----+--------------------+
|2017-10-09 18:40:02|  5.0|Great place to go...|
+-------------------+-----+--------------------+
only showing top 1 row



In [8]:
from pyspark.sql import functions as F

# Convert timestamp to numeric (e.g., epoch time)
myTable = myTable.withColumn("timestamp_numeric", F.col("timestamp").cast("long"))

# Perform your operations on the numeric column
# Example: fill missing values with a constant value or mode
myTable = myTable.na.fill({"timestamp_numeric": 0})

# Convert back to timestamp
myTable = myTable.withColumn("timestamp", F.col("timestamp_numeric").cast("timestamp")).drop("timestamp_numeric")

# Show the result
myTable.show()


+-------------------+-----+--------------------+
|          timestamp|label|             reviews|
+-------------------+-----+--------------------+
|2017-10-09 18:40:02|  5.0|Great place to go...|
|2018-05-13 13:12:18|  1.0|I used to come he...|
|1970-01-01 00:00:00| NULL|                NULL|
|1970-01-01 00:00:00| NULL|                NULL|
|2018-11-08 19:35:49|  2.0|I purchased an $8...|
|1970-01-01 00:00:00| NULL|                NULL|
|1970-01-01 00:00:00| NULL|                NULL|
|1970-01-01 00:00:00| NULL|                NULL|
|1970-01-01 00:00:00| NULL|                NULL|
|2019-01-14 18:47:26|  5.0|Free vacuuming af...|
|1970-01-01 00:00:00| NULL|                NULL|
|2019-03-13 20:24:45|  2.0|This car wash use...|
|2019-03-30 14:38:10|  2.0|Very disappointed...|
|2019-05-17 23:52:10|  2.0|This use to be on...|
|2019-10-04 19:28:36|  5.0|With so many car ...|
|2019-10-05 14:48:41|  5.0|Excellent attenti...|
|2020-05-29 00:27:01|  5.0|Great experience ...|
|2020-08-25 13:50:11

In [9]:
# from pyspark.sql import functions as F

# # Check for rows where the timestamp is 0 (or any other specific value you want to check)
# zero_timestamp_rows = myTable.filter(F.col("timestamp").cast("long") == 0)

# # Count the number of such rows
# num_zero_timestamp_rows = zero_timestamp_rows.count()

# # Show the rows with zero timestamps (optional)
# zero_timestamp_rows.show()

# print(f"Number of rows with zero timestamp values: {num_zero_timestamp_rows}")


In [10]:
from pyspark.ml.feature import Imputer
from pyspark.sql import functions as F

# Convert timestamp to numeric (e.g., epoch time)
myTable = myTable.withColumn("timestamp_numeric", F.col("timestamp").cast("long"))

# Initialize the Imputer
imputer = Imputer(inputCols=["timestamp_numeric"], outputCols=["timestamp_numeric"])

# Fit the Imputer model and transform the DataFrame
imputed_df = imputer.fit(myTable).transform(myTable)

# Convert back to timestamp
imputed_df = imputed_df.withColumn("timestamp", F.col("timestamp_numeric").cast("timestamp")).drop("timestamp_numeric")

# Show the result
imputed_df.show()


+-------------------+-----+--------------------+
|          timestamp|label|             reviews|
+-------------------+-----+--------------------+
|2017-10-09 18:40:02|  5.0|Great place to go...|
|2018-05-13 13:12:18|  1.0|I used to come he...|
|1970-01-01 00:00:00| NULL|                NULL|
|1970-01-01 00:00:00| NULL|                NULL|
|2018-11-08 19:35:49|  2.0|I purchased an $8...|
|1970-01-01 00:00:00| NULL|                NULL|
|1970-01-01 00:00:00| NULL|                NULL|
|1970-01-01 00:00:00| NULL|                NULL|
|1970-01-01 00:00:00| NULL|                NULL|
|2019-01-14 18:47:26|  5.0|Free vacuuming af...|
|1970-01-01 00:00:00| NULL|                NULL|
|2019-03-13 20:24:45|  2.0|This car wash use...|
|2019-03-30 14:38:10|  2.0|Very disappointed...|
|2019-05-17 23:52:10|  2.0|This use to be on...|
|2019-10-04 19:28:36|  5.0|With so many car ...|
|2019-10-05 14:48:41|  5.0|Excellent attenti...|
|2020-05-29 00:27:01|  5.0|Great experience ...|
|2020-08-25 13:50:11

In [11]:
# Fill null values in 'label' with a default value (e.g., 0.0)
myTable = myTable.na.fill({"label": 0.0})

# Verify that nulls have been filled
myTable.select(F.sum(F.col("label").isNull().cast("int")).alias("null_count")).show()


+----------+
|null_count|
+----------+
|         0|
+----------+



In [12]:
# Fill null values in 'label' with a default value (e.g., 0.0)
myTable = myTable.na.fill({"reviews": ''})

# Verify that nulls have been filled
myTable.select(F.sum(F.col("reviews").isNull().cast("int")).alias("null_count")).show()

+----------+
|null_count|
+----------+
|         0|
+----------+



In [13]:
from pyspark.sql import functions as F

# Create a DataFrame with the count of null values for each column
null_counts = myTable.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in myTable.columns])

# Show the result
null_counts.show()


+---------+-----+-------+-----------------+
|timestamp|label|reviews|timestamp_numeric|
+---------+-----+-------+-----------------+
|        0|    0|      0|                0|
+---------+-----+-------+-----------------+



In [14]:
myTable.show()

+-------------------+-----+--------------------+-----------------+
|          timestamp|label|             reviews|timestamp_numeric|
+-------------------+-----+--------------------+-----------------+
|2017-10-09 18:40:02|  5.0|Great place to go...|       1507574402|
|2018-05-13 13:12:18|  1.0|I used to come he...|       1526217138|
|1970-01-01 00:00:00|  0.0|                    |                0|
|1970-01-01 00:00:00|  0.0|                    |                0|
|2018-11-08 19:35:49|  2.0|I purchased an $8...|       1541705749|
|1970-01-01 00:00:00|  0.0|                    |                0|
|1970-01-01 00:00:00|  0.0|                    |                0|
|1970-01-01 00:00:00|  0.0|                    |                0|
|1970-01-01 00:00:00|  0.0|                    |                0|
|2019-01-14 18:47:26|  5.0|Free vacuuming af...|       1547491646|
|1970-01-01 00:00:00|  0.0|                    |                0|
|2019-03-13 20:24:45|  2.0|This car wash use...|       1552508

In [15]:
original_count = myTable.count()
print(f"Original number of rows: {original_count}")

Original number of rows: 1848


In [16]:
# from pyspark.ml.feature import StringIndexer, OneHotEncoder, Tokenizer
# from pyspark.sql.functions import explode

# # Step 1: Tokenize the text
# tokenizer = Tokenizer(inputCol="reviews", outputCol="words")
# wordsData = tokenizer.transform(myTable)

# # # Step 2: Explode tokens into separate rows
# explodedWordsData = wordsData.withColumn("word", explode(wordsData.words))

# # Step 3: Filter out empty tokens if any
# filteredWordsData = explodedWordsData.filter(explodedWordsData.word != "")

# # Step 4: Index the words
# indexer = StringIndexer(inputCol="word", outputCol="word_index")
# indexedData = indexer.fit(filteredWordsData).transform(filteredWordsData)

# # Step 5: One-hot encode the indexed words
# encoder = OneHotEncoder(inputCols=["word_index"], outputCols=["onehot_features"])
# oneHotData = encoder.fit(indexedData).transform(indexedData)

# # Step 6: Show the result
# oneHotData.select("reviews", "word", "onehot_features").show(truncate=False)


In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Tokenizer
from pyspark.ml.feature import SQLTransformer

# Step 1: Tokenize the text
tokenizer = Tokenizer(inputCol="reviews", outputCol="words")

# Step 2: Explode tokens into separate rows using SQLTransformer
explode_transformer = SQLTransformer(
    statement="SELECT *, EXPLODE(words) AS word FROM __THIS__"
)

# Step 3: Filter out empty tokens using SQLTransformer
filter_transformer = SQLTransformer(
    statement="SELECT * FROM __THIS__ WHERE word != ''"
)

# Step 4: Index the words
indexer = StringIndexer(inputCol="word", outputCol="word_index")

# Step 5: One-hot encode the indexed words
encoder = OneHotEncoder(inputCols=["word_index"], outputCols=["onehot_features"])

# Assemble the stages into a pipeline
pipeline = Pipeline(stages=[tokenizer, explode_transformer, filter_transformer, indexer, encoder])


In [18]:
# Fit the pipeline model
pipeline_model = pipeline.fit(myTable)


                                                                                

In [19]:
# Apply the pipeline model to the DataFrame
transformed_data = pipeline_model.transform(myTable)

# Select specific columns and show the results
transformed_data.select("reviews", "word", "onehot_features").show(5, truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+-----------------+
|reviews                                                                                                                                                                                |word |onehot_features  |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+-----------------+
|Great place to go get your car washed took out the monthly package the guys there are really helpful and go beyond to help you suggest using their car wash really nice friendly people|great|(8445,[30],[1.0])|
|Great place to go get your car washed took out the monthly package the guys there are really helpful and go beyond to help you suggest using their car wash rea

In [20]:
# Save the entire pipeline model
pipeline_model_path = "gs://pyq/pipeline_model"
pipeline_model.write().overwrite().save(pipeline_model_path)


                                                                                

In [21]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Tokenizer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import explode

# Assemble features
assembler = VectorAssembler(inputCols=["onehot_features", "timestamp_numeric"], outputCol="features")
assembled_df = assembler.transform(transformed_data)

# Split data
train_df, test_df = assembled_df.randomSplit([0.8, 0.2], seed=1234)

# Train the model
lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_df)

# Make predictions
predictions = lr_model.transform(test_df)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Stop Spark session
# spark.stop()


24/08/24 07:15:10 WARN DAGScheduler: Broadcasting large task binary with size 1309.1 KiB
24/08/24 07:15:21 WARN DAGScheduler: Broadcasting large task binary with size 1309.9 KiB
24/08/24 07:15:25 WARN DAGScheduler: Broadcasting large task binary with size 1309.9 KiB
24/08/24 07:15:26 WARN DAGScheduler: Broadcasting large task binary with size 1309.9 KiB
24/08/24 07:15:26 WARN DAGScheduler: Broadcasting large task binary with size 1309.9 KiB
24/08/24 07:15:27 WARN DAGScheduler: Broadcasting large task binary with size 1309.9 KiB
24/08/24 07:15:28 WARN DAGScheduler: Broadcasting large task binary with size 1309.9 KiB
24/08/24 07:15:28 WARN DAGScheduler: Broadcasting large task binary with size 1309.9 KiB
24/08/24 07:15:29 WARN DAGScheduler: Broadcasting large task binary with size 1309.9 KiB
24/08/24 07:15:29 WARN DAGScheduler: Broadcasting large task binary with size 1309.9 KiB
24/08/24 07:15:30 WARN DAGScheduler: Broadcasting large task binary with size 1309.9 KiB
24/08/24 07:15:31 WAR

ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
lr_model.save("gs://pyq/lrmodel")

In [None]:
from pyspark.ml.classification import LogisticRegressionModel

# Load the saved model
model_path = "gs://pyq/lrmodel"
model = LogisticRegressionModel.load(model_path)

# Use the model to make predictions
predictions = model.transform(test_df)
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")


In [None]:
accuracy