In [5]:
#Identify Error

In [47]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_replace

# Create Spark session
spark = SparkSession.builder.appName("ErrorDetection").getOrCreate()

# Load data
df_dirty = spark.read.csv('tax_dirty.csv', inferSchema=True, header=True)

# Check for typos in the 'f_name' and 'city' columns
df_dirty = df_dirty.withColumn('f_name_error', when(col('f_name').rlike('[^a-zA-Z\']'), 1).otherwise(0))
df_dirty = df_dirty.withColumn('city_error', when(col('city').rlike('[^a-zA-Z ]'), 1).otherwise(0))

# Check for formatting issues in the 'zip' and 'rates' columns
df_dirty = df_dirty.withColumn('zip_error', when(col('zip').rlike('^0|[^0-9]'), 1).otherwise(0))
df_dirty = df_dirty.withColumn('rate_error', when(col('rate').contains('.'), 1).otherwise(0))

# Check for violated attribute dependencies between 'state'/'city' and 'marital_status'/'has_child'
df_dirty = df_dirty.withColumn('state_city_error', when((col('state') == 'CA') & (col('city') != 'San Francisco'), 1).otherwise(0))
df_dirty = df_dirty.withColumn('marital_child_error', when((col('marital_status') == 'married') & (col('has_child') != 'yes'), 1).otherwise(0))

df_dirty.show()


+---------+----------+------+---------+--------+--------------+-----+-----+--------------+---------+------+---------+------------+-------------+-----------+------------+----------+---------+----------+----------------+-------------------+
|   f_name|    l_name|gender|area_code|   phone|          city|state|  zip|marital_status|has_child|salary|     rate|single_exemp|married_exemp|child_exemp|f_name_error|city_error|zip_error|rate_error|state_city_error|marital_child_error|
+---------+----------+------+---------+--------+--------------+-----+-----+--------------+---------+------+---------+------------+-------------+-----------+------------+----------+---------+----------+----------------+-------------------+
| Pengyuan|   Zendler|     F|      508|744-9007|    SWAMPSCOTT|   MA| 1907|             M|        N| 90000|      5.3|           0|         7150|          0|           0|         0|        0|         1|               0|                  0|
|      Nik|     Tacic|     M|      702|517-7

In [6]:
#how many errors in each columns

In [48]:
# Count the errors
error_columns = ['f_name_error', 'city_error', 'zip_error', 'rate_error', 'state_city_error', 'marital_child_error']
for column in error_columns:
    print(f"{column}: {df_dirty.filter(col(column) == 1).count()}")


f_name_error: 0
city_error: 219


                                                                                

zip_error: 0
rate_error: 199813
state_city_error: 3831
marital_child_error: 0


In [63]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, length
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# Create Spark session
spark = SparkSession.builder.appName("ErrorDetection").getOrCreate()

# Load data
df_dirty = spark.read.csv('tax_dirty.csv', inferSchema=True, header=True)

# Prepare the data
df_clean = df_dirty.drop('l_name', 'gender', 'area_code', 'phone', 'salary', 'single_exemp',
                         'married_exemp', 'child_exemp')  # Drop unnecessary columns

df_clean = df_clean.withColumn('error', when((col('f_name').rlike('[^a-zA-Z\']')) | (col('city').rlike('[^a-zA-Z ]'))
                                             | (col('zip').rlike('^0|[^0-9]')) | (col('rate').contains('.')) |
                                             ((col('state') == 'CA') & (col('city') != 'San Francisco')) |
                                             ((col('marital_status') == 'married') & (col('has_child') != 'yes')),
                                             1).otherwise(0))

# Feature engineering (example: extracting the length of 'f_name' and 'city' columns)
df_clean = df_clean.withColumn('f_name_length', length(col('f_name').cast('string')))
df_clean = df_clean.withColumn('city_length', length(col('city').cast('string')))

# Split the data into training and testing datasets
(train_data, test_data) = df_clean.randomSplit([0.7, 0.3], seed=42)

# Prepare the features vector
feature_columns = ['f_name_length', 'city_length']  # Add more columns as needed
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)

# Train the model
lr = LogisticRegression(labelCol='error', featuresCol='features')
model = lr.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)

evaluator = BinaryClassificationEvaluator(labelCol='error')
accuracy = evaluator.evaluate(predictions)
print('Accuracy:', accuracy)

# Predict errors on new data
new_data = spark.read.csv('new_data.csv', inferSchema=True, header=True)  # Replace with your new data file

# Feature engineering for new data (calculate f_name_length and city_length)
new_data = new_data.withColumn('f_name_length', length(col('f_name').cast('string')))
new_data = new_data.withColumn('city_length', length(col('city').cast('string')))

new_data = assembler.transform(new_data)

predictions_new_data = model.transform(new_data)
predicted_errors = predictions_new_data.select('f_name', 'prediction').filter(col('prediction') == 1)

predicted_errors.show()


                                                                                

Accuracy: 0.5496733545778227
+---------+----------+
|   f_name|prediction|
+---------+----------+
| Pengyuan|       1.0|
|      Nik|       1.0|
|    Hovav|       1.0|
|Xiangning|       1.0|
|    Belen|       1.0|
|Sudhanshu|       1.0|
|  Babette|       1.0|
|    Noury|       1.0|
| Rossella|       1.0|
|   Fadila|       1.0|
|    Nabuo|       1.0|
|   Violet|       1.0|
|  Elliott|       1.0|
|   Gherin|       1.0|
|   Thoddi|       1.0|
|   Aurora|       1.0|
|     Irit|       1.0|
|     Tina|       1.0|
|   Howard|       1.0|
|    Raven|       1.0|
+---------+----------+
only showing top 20 rows



In [65]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, length
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# Create Spark session
spark = SparkSession.builder.appName("ErrorDetection").getOrCreate()

# Load data
df_dirty = spark.read.csv('tax_dirty.csv', inferSchema=True, header=True)

# Prepare the data
df_clean = df_dirty.drop('l_name', 'gender', 'area_code', 'phone', 'salary', 'single_exemp',
                         'married_exemp', 'child_exemp')  # Drop unnecessary columns

df_clean = df_clean.withColumn('error', when((col('f_name').rlike('[^a-zA-Z\']')) | (col('city').rlike('[^a-zA-Z ]'))
                                             | (col('zip').rlike('^0|[^0-9]')) | (col('rate').contains('.')) |
                                             ((col('state') == 'CA') & (col('city') != 'San Francisco')) |
                                             ((col('marital_status') == 'married') & (col('has_child') != 'yes')),
                                             1).otherwise(0))

# Feature engineering (example: extracting the length of 'f_name', 'city', and 'zip' columns)
df_clean = df_clean.withColumn('f_name_length', length(col('f_name').cast('string')))
df_clean = df_clean.withColumn('city_length', length(col('city').cast('string')))
df_clean = df_clean.withColumn('zip_length', length(col('zip').cast('string')))

# Split the data into training and testing datasets
(train_data, test_data) = df_clean.randomSplit([0.7, 0.3], seed=42)

# Prepare the features vector
feature_columns = ['f_name_length', 'city_length', 'zip_length']  # Add more columns as needed
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)

# Train the model
lr = LogisticRegression(labelCol='error', featuresCol='features')
model = lr.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)

evaluator = BinaryClassificationEvaluator(labelCol='error')
accuracy = evaluator.evaluate(predictions)
print('Accuracy:', accuracy)

# Predict errors on new data
new_data = spark.read.csv('new_data.csv', inferSchema=True, header=True)  # Replace with your new data file

# Feature engineering for new data (calculate f_name_length, city_length, and zip_length)
new_data = new_data.withColumn('f_name_length', length(col('f_name').cast('string')))
new_data = new_data.withColumn('city_length', length(col('city').cast('string')))
new_data = new_data.withColumn('zip_length', length(col('zip').cast('string')))

new_data = assembler.transform(new_data)

predictions_new_data = model.transform(new_data)
predicted_errors = predictions_new_data.select('f_name','city','zip', 'prediction').filter(col('prediction') == 1)

predicted_errors.show()


                                                                                

Accuracy: 0.9081510151917853
+---------+--------------+-----+----------+
|   f_name|          city|  zip|prediction|
+---------+--------------+-----+----------+
| Pengyuan|    SWAMPSCOTT| 1907|       1.0|
|      Nik|     LAS VEGAS|89140|       1.0|
|    Hovav|         HASTY|72640|       1.0|
|Xiangning|    BRIGANTINE| 8203|       1.0|
|    Belen|      FLORENCE|54121|       1.0|
|Sudhanshu|   BROWNSVILLE|78521|       1.0|
|  Babette|    WASHINGTON|20226|       1.0|
|    Noury|      LAWRENCE|66049|       1.0|
| Rossella|    WASHINGTON|20541|       1.0|
|   Fadila|SAINT MATTHEWS|29135|       1.0|
|    Nabuo|     SOUTHBURY| 6488|       1.0|
|   Violet|         BRADY|59416|       1.0|
|  Elliott|        AUBURN|98001|       1.0|
|   Gherin|      ROCHELLE|61068|       1.0|
|   Thoddi|     WILLAMINA|97396|       1.0|
|   Aurora|      LAKEWOOD|98498|       1.0|
|     Irit|        SENECA|69161|       1.0|
|     Tina|       GAFFNEY|29340|       1.0|
|   Howard|      STAMFORD| 6913|       1.0|
|  