In [1]:
print("Hello world")

Hello world


In [2]:
from pyspark.sql import SparkSession

# Create a simple Spark session
spark = SparkSession.builder \
    .appName("SimpleTest") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Create a simple DataFrame
df = spark.range(1000).toDF("number")
df.show()

spark.stop()



+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
+------+
only showing top 20 rows



In [11]:
spark.stop()

ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, log
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-memory 4g pyspark-shell'

# Initialize Spark session
spark = SparkSession.builder \
    .appName('FraudDetection') \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

# Load the data
path = "Fraud.csv"
fraud = spark.read.csv(path, header=True, inferSchema=True)

# Display the original count of rows
original_count = fraud.count()
print(f"Original count of rows: {original_count}")

# Remove the last 1000 rows
if original_count > 636000:
    filtered_fraud = fraud.limit(original_count - 636000)
else:
    print("Not enough rows to remove 1000.")
    filtered_fraud = fraud  # Keep all rows if less than 1000

# Display the count of rows after filtering
filtered_count = filtered_fraud.count()
print(f"Count of rows after removing last 1000: {filtered_count}")

# Checking unique values for nameOrig and nameDest columns
print(filtered_fraud.select('nameOrig').distinct().count())
print(filtered_fraud.select('nameDest').distinct().count())

# Check for null values
filtered_fraud.select([col(c).isNull().alias(c) for c in filtered_fraud.columns]).show()
print("null values done")

# Fraud transaction statistics
total_count = filtered_fraud.count()
legit_transaction = filtered_fraud.filter(filtered_fraud.isFraud == 0).count()
fraud_transaction = filtered_fraud.filter(filtered_fraud.isFraud == 1).count()
print("statistics done")
legit_percent = (legit_transaction / total_count) * 100
fraud_percent = (fraud_transaction / total_count) * 100

print(f"Total Legit transactions: {legit_transaction}")
print(f"Total Fraud transactions: {fraud_transaction}")
print(f"Percentage of Legit transactions: {legit_percent:.4f}%")
print(f"Percentage of Fraud transactions: {fraud_percent:.4f}%")

# Analyzing fraud by transaction type
grouped_df = filtered_fraud.groupBy('type', 'isFraud').count().toPandas()
print(grouped_df.pivot(index='type', columns='isFraud', values='count'))

# Label Encoding categorical columns
indexer = StringIndexer(inputCol="type", outputCol="typeIndex")
filtered_fraud = indexer.fit(filtered_fraud).transform(filtered_fraud)

indexer_nameDest = StringIndexer(inputCol="nameDest", outputCol="nameDestIndex")
filtered_fraud = indexer_nameDest.fit(filtered_fraud).transform(filtered_fraud)

# Create new features: diffDist and diffOrg
filtered_fraud = filtered_fraud.withColumn("diffDist", col("oldbalanceDest") - col("newbalanceDest"))
filtered_fraud = filtered_fraud.withColumn("diffOrg", col("oldbalanceOrg") - col("newbalanceOrig"))

# Ensure non-negative values for log transformation
for col_name in ['amount', 'oldbalanceDest', 'newbalanceDest', 'oldbalanceOrg', 'newbalanceOrig']:
    min_value = filtered_fraud.agg({col_name: "min"}).collect()[0][0]
    shift_value = abs(min_value) + 1 if min_value <= 0 else 0
    filtered_fraud = filtered_fraud.withColumn(col_name, log(col(col_name) + shift_value))

# Drop irrelevant columns
filtered_fraud = filtered_fraud.drop('nameOrig', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest')
print("features dropped")

# Assembling features for the model
feature_cols = ['amount', 'diffDist', 'diffOrg', 'typeIndex', 'nameDestIndex']
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
filtered_fraud = assembler.transform(filtered_fraud)
print("features encoded")

filtered_fraud = filtered_fraud.repartition(1000)

# Standardizing features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
filtered_fraud = scaler.fit(filtered_fraud).transform(filtered_fraud)
print("features scaled")

# Split the data into train and test sets
train_data, test_data = filtered_fraud.randomSplit([0.8, 0.2], seed=42)

# Decision Tree Classifier
dt = DecisionTreeClassifier(labelCol="isFraud", featuresCol="scaledFeatures", maxDepth=5)
model = dt.fit(train_data)

# Predictions
predictions = model.transform(test_data)

# Evaluation
evaluator = MulticlassClassificationEvaluator(labelCol="isFraud", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Confusion Matrix
y_true = predictions.select("isFraud").toPandas()
y_pred = predictions.select("prediction").toPandas()
sns.heatmap(confusion_matrix(y_true, y_pred), annot=True, fmt="d", cmap="Blues")
plt.title("Confusion Matrix")
plt.show()

# Stop Spark session
spark.stop()


ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

In [8]:
spark.stop()


ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

In [9]:
spark

ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

<pyspark.sql.session.SparkSession at 0x25f02113ed0>