<a href="https://colab.research.google.com/github/KateBA715/KateBA715/blob/main/cis2349cFinalProjectPySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# CIS2349C Final Project

# make sure pyspark is installed
!pip install pyspark



Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=1c0ab7d919b8a22609a3921e05f1378b4519209c08c00b70a150a8b89f49278a
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [21]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# import the important things
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, when
from pyspark.sql.types import StructType, StructField, IntegerType,\
                              StringType, DoubleType, BooleanType
import itertools

In [None]:
# create a Spark session entry point and create a cluster
spark=SparkSession.builder.appName('cis2349c').getOrCreate()

# create a cluster
spark

In the next cell you will create a Spark dataframe (similar to, but different from, a Pandas dataframe) from the uploaded CSV data file (if you have not uploaded it yet, you should do so now). Note that the data file includes headers.

Use the spark.read.csv('filename') command to read the data file. Setting the <b>header</b> option in the arguments to <b>True</b> will properly assign the header names. For example, assuming you name your Spark dataframe "df_pyspark", use the following command:

<pre>df_pyspark = spark.read.csv('/content/cis2349cPaySim.csv', header=True)</pre>

Use the "Copy Path" option from the Files tool (left vertical menu) to properly set the path for the file.

Assign the dataframe to a variable for later use. In the code examples provided below the name 'df_pyspark' is used.

To view the first few lines of the data, use the show function, e.g.

<pre>df_pyspark.show(5)</pre>

In [None]:
# prompt:

# Load the data file into a Spark dataframe
df_pyspark = spark.read.csv('/content/cis2349cPaySim.csv', header=True)

# Print the schema of the dataframe
df_pyspark.printSchema()

# Show the first few rows of the dataframe
df_pyspark.show(5)


root
 |-- step: string (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: string (nullable = true)
 |-- newbalanceOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: string (nullable = true)
 |-- newbalanceDest: string (nullable = true)
 |-- isFraud: string (nullable = true)
 |-- isFlaggedFraud: string (nullable = true)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|       170136|     160296.36|M1979787155|             0|             0|      0|             0|
|   1| PAYME

Now let's clean up the data and do some transformations. The original file had some inconsistent column names, and we also want to use camel-case consistently for the headings. Let's change the following headings (look carefully at the names, the changes are subtle).

- 'oldbalanceOrg' should be 'oldBalanceOrig'
- 'newbalanceOrig' should be 'newBalanceOrig'
- 'oldbalanceDest' should be 'oldBalanceDest'
- 'newbalanceDest' should be 'newBalanceDest'

To make these changes, we can use the PySpark "withColumnRenamed" function, e.g.

<pre>df_pyspark.withColumnRenamed('oldbalanceOrg', 'oldBalanceOrig')</pre>

These functions can be chained to help keep them logically organized, e.g.

<pre>df_pyspark.withColumnRenamed('oldname1', 'newname1')
              .withColumnRenamed('oldname2', 'newname2')</pre>

In [15]:
df_pyspark = (df_pyspark.withColumnRenamed('oldbalanceOrg', 'oldBalanceOrig')
                       .withColumnRenamed('newbalanceOrig', 'newBalanceOrig')
                       .withColumnRenamed('oldbalanceDest', 'oldBalanceDest')
                       .withColumnRenamed('newBalanceDest', 'newBalanceDest'))

You may have noticed that all of the columns in the dataframe have a type of string. We need to modify those types in order to process them further.

The type changes needed should be as shown here:

- step: integer
- oldBalanceOrig: double
- newBalanceOrig: double
- oldBalanceDest: double
- newBalanceDest: double
- isFraud: boolean
- flaggedAsFraud: boolean

To change a data type, use the PySpark withColumn chained to a cast function, e.g.

<pre>df_pyspark.withColumn('step', col('step').cast('integer')</pre>

As with the withColumnRenamed function, you can chain multiples of these together, e.g.

<pre>df_pyspark.withColumn('column1', col('column1').cast('type'))\
      .withColumn('column2', col('column2').cast('type'))</pre>

The boolean columns are a little more complex, here is <b>isFraud</b> as an example:

<pre>df_pyspark = df_pyspark.withColumn('isFraud',
       when(col('isFraud').isin('1'), True)
      .when(col('isFraud').isin('0'), False)
      .otherwise(None))</pre>


Finally, to verify the changes, use the print_schema function:

<pre>df_pyspark.printSchema()</pre>

In [35]:
df_pyspark = df_pyspark.withColumn('step', col('step').cast('integer'))\
    .withColumn('amount', col('amount').cast('double'))\
    .withColumn('oldBalanceOrig', col('oldBalanceOrig').cast('double'))\
    .withColumn('newBalanceOrig', col('newBalanceOrig').cast('double'))\
    .withColumn('oldBalanceDest', col('oldBalanceDest').cast('double'))\
    .withColumn('newBalanceDest', col('newBalanceDest').cast('double'))


In [36]:
from pyspark.sql.functions import col

df_pyspark = df_pyspark.withColumn('isFraud', col('isFraud').cast('string'))\
    .withColumn('isFraud',
       when(col('isFraud').isin('1'), True)
      .when(col('isFraud').isin('0'), False)
      .otherwise(None))\
    .withColumn('isFlaggedFraud', col('isFlaggedFraud').cast('string'))\
    .withColumn('isFlaggedFraud',
       when(col('isFlaggedFraud').isin('1'), True)
      .when(col('isFlaggedFraud').isin('0'), False)
      .otherwise(None))

df_pyspark.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldBalanceOrig: double (nullable = true)
 |-- newBalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldBalanceDest: double (nullable = true)
 |-- newBalanceDest: double (nullable = true)
 |-- isFraud: boolean (nullable = true)
 |-- isFlaggedFraud: boolean (nullable = true)



In a distributed data environment (e.g. Hadoop) there are various ways to access data for analysis using PySpark. "show" is the most efficient for large datasets, we could also use "collect", "take", "head", but these functions will first retrieve the entire dataset ("collect it to the driver"). "show" only accesses what is needed to show the desired number of lines.

Here’s what happens when you use collect() or similar methods (take(), head(), etc.):
- Data Aggregation: The distributed data residing on multiple worker nodes is gathered together.
- Data Transfer: This aggregated data is then transferred over the network to the driver node.
- Single Location: The data is now available on the driver node as a local object (like a list or array in Python), and you can interact with it just like you would with any local data structure in Python.

Using these methods has several implications:
- Memory Limitation: The driver node has its own memory limits. If you try to collect a very large dataset, it might not fit into the driver’s memory, leading to errors or crashes.
- Performance Impact: Collecting data from distributed nodes to a single node can be network and resource-intensive, especially for large datasets. It can significantly impact the performance and should be used judiciously.
- Use Cases: It’s often used for smaller datasets or when the final output of a large computation is relatively small (like collecting the final results of an aggregation).

count() is another method which triggers computation over the entire DataFrame. Depending on the size of your data and the configuration of your Spark cluster, this operation might take some time for very large datasets.

In the following cell, show the row count of the dataframe (it should be 800000).

In [37]:
row_count = df_pyspark.count()
print('Row Count:', row_count)

Row Count: 353714


According to the author of our reference material for this project, we want to do the fraud analysis based on transaction types that are transfers (type == TRANSER) or cash-out (type == CASH_OUT) where the account owner is withdrawing funds.

As with Pandas, PySpark can filter dataframe rows based on logical conditions. In the following cell, filter the dataframe rows based on the transaction type as described above using a logical OR. Here is the filter command:

<pre>df_pyspark = df_pyspark.filter((col("type") == "TRANSFER") | (col("type") == "CASH_OUT"))</pre>

After running the filter, show the new row count of the dataframe (it should be 353714).

In [38]:
df_pyspark = df_pyspark.filter((col("type") == "TRANSFER") | (col("type") ==
                                                              "CASH_OUT"))

row_count = df_pyspark.count()
print('Row Count:', row_count)

Row Count: 353714


For this exercise, we will use a simple heuristic (rule-based) approach of categorizing any large-value transactions as fraud. This approach is oversimplistic but is sufficient for our purposes.

To do this we calculate the median value of the previously filtered transactions using the value of 1 (True) for the <b>isFraud</b> column and then using the  <b>approxQuantile</b> method (since PySpark does not have a built-in median function). We will use that value as the threshold for suspected fraud.

Use the filter function provided in the previous cell to find the rows where isFraud == 1.

Here is the syntax for the approxQuantile method:

approxQuantile('column', [probability], relative error)[0]

Call the approxQuantile method on the amount column, with the value of 0.5 for the median, and 0 for the relative error (0 is used for an exact computation). The [0] at the end of the argument list extracts the value from the list that is returned by the function.

In [41]:
fraud_dfps = df_pyspark.filter((df_pyspark.isFraud == 1) & (df_pyspark.type.isin
 (["TRANSFER", "CASH_OUT"])))
median_value = fraud_dfps.approxQuantile("amount",[0.5],0)

if len(median_value) > 0:
    fraud_median = median_value[0]
    print('Fraudulent Transactions-Median Value:', median_value)
else:
    print('No fraudulent transactions found...this time.')


No fraudulent transactions found...this time.


Now let's create a boolean column indicating whether a row should be included in our Fraud Heuristic data.

Use the "withColumn" method and a "when" expression specifying an amount > the calculated median. Set the column value to 1 for this condition, 0 otherwise:

<pre>df_pyspark.withColumn("Fraud_Heuristic", when(col("amount") > median_value, 1).otherwise(0))</pre>

After applying this operation, show a selection of columns for the first few rows to verify:

<pre>df_pyspark.select("amount", "Fraud_Heuristic").show()</pre>

In [43]:
fraud_dfps_count = fraud_dfps.count()
if fraud_dfps_count > 0:
    median_value = fraud_dfps.approxQuantile("amount",[0.5],0)
    fraud_median = median_value[0]
    print('Fraudulent Transactions-Median Value:', fraud_median)
else:
    print('No fraudulent transactions found...this time.')


No fraudulent transactions found...this time.


Now let's calculate an F1 score using PySpark's <b>MulticlassClassificationEvaluator</b>, part of Apache Spark's MLlib machine learning library. It is used to evaluate the performance of classification models on a dataset.

The F1 score is a measure of a model's accuracy. This score is particularly useful when false positives and false negatives carry different costs (e.g., a false positive for fraud can cause you to lose customers!)

The score measures precision and recall.

- <b>Precision</b> is the ratio of correctly predicted positive observations to the total predicted positives. It answers the question, "Of all the labels predicted as positive, how many are actually positive?"
- <b>Recall</b> is the ratio of correctly predicted positive observations to  all observations in the actual class. It answers the question, "Of all the actual positives, how many did we predict as positive?"

Values of the F1 score range from 0 to 1, where 1 indicates perfect precision and recall; the higher the F1 score, the better.

To set up the call to the evaluator, use the following code (assuming your PySpark dataframe is named "df_pyspark")

<pre># Ensure that the columns are of type Double,
# required by the MulticlassClassificationEvaluator
df_pyspark = df_pyspark.withColumn("Fraud_Heuristic", col("Fraud_Heuristic")
                        .cast(DoubleType())) \
                        .withColumn("isFraud", col("isFraud").cast(DoubleType()))

# Create an evaluator for binary classification with the metric as F1
evaluator = MulticlassClassificationEvaluator(labelCol="isFraud",
                                              predictionCol="Fraud_Heuristic",
                                              metricName="f1")
</pre>

The evaluator constructor arguments are as follows:

- labelCol="isFraud": This parameter specifies the name of the column in the dataset that contains the true labels for the classification task
- predictionCol="Fraud_Heuristic": This parameter indicates the name of the column that contains the predictions made by the classification model.
- metricName="f1": This parameter specifies the metric used to evaluate the model's performance. The "f1" metric refers to the F1 score.

To actually run the evaluator and calculate the F1 score, run the following:

<pre># Calculate the F1 score
f1_score = evaluator.evaluate(df_pyspark)
</pre>

and then you can just print the f1_score.



If you calculated this score correctly, you should see a value of about 2/3 (0.66...).

This F1 score could be interpreted as either good or not so good depending on the context of the specific problem, the complexity of the task, the nature of the data, and the baseline or comparison scores.

In domains such as medical diagnosis or fraud detection, a higher F1 score is generally desired due to the critical nature of false negatives and false positives. In such cases, an F1 score of 66% might be considered insufficient.