In [91]:
from kafka import KafkaConsumer
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType
import json
import pyspark
from pyspark.sql.functions import from_json, col
from datetime import datetime
from pyspark.sql.functions import *
import pyspark.sql.functions as f
import json

In [92]:
spark = SparkSession.builder.master("local").appName("Analytics").getOrCreate()
df = spark.read.option("header","true").csv("2022*/*.csv")
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- country: string (nullable = true)
 |-- name: string (nullable = true)
 |-- number: string (nullable = true)
 |-- CVC: string (nullable = true)
 |-- expire: string (nullable = true)
 |-- IsLegit: string (nullable = true)
 |-- reason: string (nullable = true)



In [93]:
df.count()

639

In [94]:
df.show(10)

+-------------------+--------------------+-------+------------------+-------------------+----+------+-------+-------------+
|               date|                  ID|country|              name|             number| CVC|expire|IsLegit|       reason|
+-------------------+--------------------+-------+------------------+-------------------+----+------+-------+-------------+
|2022/12/09 16:55:14|dec03d92-77d9-11e...|    NZL|    Victoria Lopez|     30295515243574| 309| 06/32|   True|Card accepted|
|2022/12/09 16:55:15|dee1e320-77d9-11e...|    NOR|     Diane Johnson|      4073127356270|9873| 05/32|   True|Card accepted|
|2022/12/09 16:55:15|def3ad30-77d9-11e...|    UKR|        Evan Smith|   3574816785977710| 202| 09/28|  False|   False Card|
|2022/12/09 16:55:15|df13bf8a-77d9-11e...|    UKR|       Vicki Blake|       675929989928| 890| 06/31|   True|Card accepted|
|2022/12/09 16:55:15|df247dca-77d9-11e...|    COM|     Julian Taylor|   4566318204615185| 956| 09/32|   True|Card accepted|
|2022/12

In [95]:
df.groupBy("country").count().orderBy(col("count").desc()).limit(1).show()

+-------+-----+
|country|count|
+-------+-----+
|    LBY|    7|
+-------+-----+



[Stage 87:>                                                         (0 + 1) / 1]                                                                                

In [96]:
df = df.withColumn('date', to_timestamp(df.date, 'yyyy/MM/dd HH:mm:ss'))
df = df.withColumn("day", to_date(df.date))
df = df.withColumn("time", date_format(df.date, "HH:mm:ss"))

In [97]:
df.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- ID: string (nullable = true)
 |-- country: string (nullable = true)
 |-- name: string (nullable = true)
 |-- number: string (nullable = true)
 |-- CVC: string (nullable = true)
 |-- expire: string (nullable = true)
 |-- IsLegit: string (nullable = true)
 |-- reason: string (nullable = true)
 |-- day: date (nullable = true)
 |-- time: string (nullable = true)



In [98]:
df.show()

+-------------------+--------------------+-------+------------------+-------------------+----+------+-------+-------------+----------+--------+
|               date|                  ID|country|              name|             number| CVC|expire|IsLegit|       reason|       day|    time|
+-------------------+--------------------+-------+------------------+-------------------+----+------+-------+-------------+----------+--------+
|2022-12-09 16:55:14|dec03d92-77d9-11e...|    NZL|    Victoria Lopez|     30295515243574| 309| 06/32|   True|Card accepted|2022-12-09|16:55:14|
|2022-12-09 16:55:15|dee1e320-77d9-11e...|    NOR|     Diane Johnson|      4073127356270|9873| 05/32|   True|Card accepted|2022-12-09|16:55:15|
|2022-12-09 16:55:15|def3ad30-77d9-11e...|    UKR|        Evan Smith|   3574816785977710| 202| 09/28|  False|   False Card|2022-12-09|16:55:15|
|2022-12-09 16:55:15|df13bf8a-77d9-11e...|    UKR|       Vicki Blake|       675929989928| 890| 06/31|   True|Card accepted|2022-12-09|16

In [99]:
def from_date(df, date):
    # date format : YYYY-MM-DD
    return df.filter(df.day == date)

# Get all transaction done after time
def from_time(df, time):
    # time format : HH:mm:ss
    return df.filter(df.time >= time)

def from_date_time(df, date, time):
    return df.filter((df.day == date) & (df.time >= time))

from datetime import datetime, date, timedelta

def days_between(d1, d2):
    d1 = datetime.strptime(d1, "%Y-%m-%d")
    d2 = datetime.strptime(d2, "%Y-%m-%d")
    return (d2 - d1).days

def from_range(df, start, end):
    total_count = df.filter((df.day >= start) & (df.day <= end)).count()
    list_day = []
    true_count = []
    false_count = []
    nb_day = days_between(start, end)
    for i in range(nb_day + 1):
        date = datetime.strptime(start, '%Y-%m-%d')
        result = date + timedelta(days=i)
        list_day.append(result)
        true_count.append(from_date(df, result).filter(col("IsLegit") == True).count())
        false_count.append(from_date(df, result).filter(col("IsLegit") == False).count())
    return total_count, list_day, true_count, false_count

In [100]:
from_date(df, "2022-12-08").count()

0

In [101]:
from_time(df, "20:00:00").count()

0

In [102]:
from_date_time(df, "2022-12-08", "16:28:00").count()

0

In [103]:
from_date(df, "2022-12-07").filter(col("IsLegit") == False).count()

0

In [104]:
a,b,c,d = from_range(df, "2022-12-07", "2022-12-08")

In [105]:
import plotly.express as px

# Create a list of x and y values
x = [1, 2, 3, 4, 5]
y = [10, 20, 30, 40, 50]
y2 = [5,5,5,5,5]
# Create the bar chart
fig = px.bar(x=b, y=[c, d], barmode="group", labels={'x':'Date', 'value':'Count'}, title="Transaction by day")
newnames = {"wide_variable_0":"True", "wide_variable_1":"False"}
fig.for_each_trace(lambda t: t.update(name = newnames[t.name]))
fig.show()

In [None]:
!streamlit run app.py

[0m
[34m[1m  You can now view your Streamlit app in your browser.[0m
[0m
[34m  Local URL: [0m[1mhttp://localhost:8501[0m
[34m  Network URL: [0m[1mhttp://192.168.150.130:8501[0m
[0m
22/12/09 17:03:07 WARN Utils: Your hostname, MSI resolves to a loopback address: 127.0.1.1; using 192.168.124.1 instead (on interface eth1)
22/12/09 17:03:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/09 17:03:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2022-12-09 17:03:21.278 Uncaught app exception
Traceback (most recent call last):
  File "/home/drahcear/pytorch/lib/python3.8/site-packages/streamlit/runtime/scriptrunner/script_runner.py", line 562, in _run_script
    exec

In [109]:
def plot_per_coutry(df, date):
    df_tmp = df.groupBy("country").count()
    df_country = df_tmp.select("country").collect()
    df_count = df_tmp.select("count").collect()
    df_tmp2 = df_tmp.withColumn("json", f.to_json(f.struct("country", "count")))
    country_dict = df_tmp2.select("json").rdd.map(lambda x: json.loads(x[0])).collect()
    fig = px.scatter_geo(country_dict, locations='country', color='country',
                     size='count', title='Countries by Number')
    return fig