In [1]:
import sys, glob, os
SPARK_HOME=os.environ['SPARK_HOME']
sys.path.append(SPARK_HOME + "/python")
sys.path.append(glob.glob(SPARK_HOME + "/python/lib/py4j*.zip")[0])
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf 


cassandra_host = "localhost"

spark_conf = (SparkConf()
                .setAppName("BatchJob - Data loader")
                .setIfMissing("spark.master", "local")
                .set("spark.cassandra.connection.host", cassandra_host)
                .set("spark.cassandra.connection.port", 9042)
                .set("spark.sql.shuffle.partitions", 10)
                .set("", 10)
               )

# Create spark session
spark = (SparkSession
         .builder
         .config(conf = spark_conf)
         .getOrCreate()
        )
sc = spark.sparkContext
sql = spark.sql
print(sc.uiWebUrl)

http://10.0.2.15:4040


In [2]:
from pyspark.sql import Row
from pyspark.sql import functions as F

In [3]:
rdd = sc.parallelize([Row(a = 1, b = 4)])

In [4]:
df = spark.createDataFrame(rdd)
df.show()

+---+---+
|  a|  b|
+---+---+
|  1|  4|
+---+---+



In [5]:
r = df.rdd.collect()[0]

In [6]:
r.a

1

In [7]:
base_path = "file:///home/cloudera/notebooks/RandomDataGenerator-master/target/"

In [8]:
customers = spark.read.options(inferSchena = True).json(base_path + "customers.json")
customers.show()
customers.printSchema()
(customers
    .drop("address")
    .write
    .mode("overwrite")
    .format("org.apache.spark.sql.cassandra")
    .options(table = "customer", keyspace = "cc")
    .save())

+--------------------+---+----------+--------------------+----------+------+------------+--------------+
|             address|age|       dob|               email|first_name|gender|          id|     last_name|
+--------------------+---+----------+--------------------+----------+------+------------+--------------+
|[Brooklyn,Kings,L...| 19|1999-09-09| iforell@hotmail.com|    Ivania|     M|800000000000|        Forell|
|[Bronx,Bronx,LF00...| 63|1955-06-23|jodena.tetreau@ms...|    Jodena|     F|800000000001|       Tetreau|
|[Merrick,Nassau,L...| 73|1945-02-05|     sshoyko@msn.com|   Shawnic|     F|800000000002|        Shoyko|
|[New York,New Yor...| 67|1950-11-19|jerzei.berardo@gm...|    Jerzei|     F|800000000003|       Berardo|
|[Saratoga Springs...| 28|1990-06-05|einzinger@hotmail...|   Elinora|     M|800000000004|      Inzinger|
|[Rome,Oneida,LF00...| 34|1984-09-28|bschillinglaw@msn...|   Braylyn|     F|800000000005|  Schillinglaw|
|[New York,New Yor...| 61|1957-01-24| hsherwood@gmail.c

In [9]:
def cass_table(table_name):
    return (spark
        .read
        .format("org.apache.spark.sql.cassandra")
        .options(table = table_name, keyspace = "cc")
        .load())

In [10]:
cass_table("customer").show()

+------------+-------+---+----------------------+----------------------+----------+--------------------+----------+------+-------------+
|          id|address|age|amount_lower_threshold|amount_upper_threshold|       dob|               email|first_name|gender|    last_name|
+------------+-------+---+----------------------+----------------------+----------+--------------------+----------+------+-------------+
|800000002686|   null| 79|                  null|                  null|1939-03-15|clea.eckenbrecht@...|      Clea|     M|  Eckenbrecht|
|800000009014|   null| 22|                  null|                  null|1996-03-29|deondrick.manopin...| Deondrick|     F|  Manopinives|
|800000003508|   null| 33|                  null|                  null|1985-04-26|jchristophides@ms...|  Jazabell|     F|Christophides|
|800000009097|   null| 74|                  null|                  null|1944-05-26|kiptyn.chato@hotm...|    Kiptyn|     F|        Chato|
|800000003132|   null| 76|               

In [11]:
merchants = spark.read.options(inferSchena = True).json(base_path + "merchants.json")
merchants.show()
merchants.printSchema()
(merchants
    .write
    .mode("overwrite")
    .format("org.apache.spark.sql.cassandra")
    .options(table = "merchant", keyspace = "cc")
    .save())

+------------+--------------------+
|          id|                name|
+------------+--------------------+
|DZ0000000000|iShares 7-10 Year...|
|DZ0000000001|National American...|
|DZ0000000002|Jensyn Acquistion...|
|DZ0000000003|     Interface, Inc.|
|DZ0000000004| FTD Companies, Inc.|
|DZ0000000005|NextDecade Corpor...|
|DZ0000000006|  MakeMyTrip Limited|
|DZ0000000007|Dynavax Technolog...|
|DZ0000000008|        HyreCar Inc.|
|DZ0000000009|Highland/iBoxx Se...|
|DZ0000000010|  Liberty Global plc|
|DZ0000000011|Green Plains Part...|
|DZ0000000012|  Vertex Energy, Inc|
|DZ0000000013|     Fuel Tech, Inc.|
|DZ0000000014|    TiVo Corporation|
|DZ0000000015|        Cerecor Inc.|
|DZ0000000016|    SMTC Corporation|
|DZ0000000017|Magellan Health, ...|
|DZ0000000018|      Check-Cap Ltd.|
|DZ0000000019|Village Bank and ...|
+------------+--------------------+
only showing top 20 rows

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



In [12]:
(spark
    .read
    .format("org.apache.spark.sql.cassandra")
    .options(table = "merchant", keyspace = "cc")
    .load()).show()

+------------+-------+--------------------+------+
|          id|address|                name|ticker|
+------------+-------+--------------------+------+
|DZ0000000059|   null|Synergy Pharmaceu...|  null|
|DZ0000000046|   null|Northern Trust Co...|  null|
|DZ0000000020|   null|Gladstone Investm...|  null|
|DZ0000000093|   null|Golden Ocean Grou...|  null|
|DZ0000000008|   null|        HyreCar Inc.|  null|
|DZ0000000090|   null|Allegiance Bancsh...|  null|
|DZ0000000027|   null| iShares Asia 50 ETF|  null|
|DZ0000000019|   null|Village Bank and ...|  null|
|DZ0000000043|   null|       Popular, Inc.|  null|
|DZ0000000074|   null|1347 Property Ins...|  null|
|DZ0000000065|   null|Invesco Dividend ...|  null|
|DZ0000000025|   null|RXi Pharmaceutica...|  null|
|DZ0000000037|   null|Sierra Wireless, ...|  null|
|DZ0000000041|   null|  Cool Holdings Inc.|  null|
|DZ0000000054|   null|Willis Lease Fina...|  null|
|DZ0000000058|   null|Priority Technolo...|  null|
|DZ0000000038|   null|Mitcham I

In [13]:
transactions = (spark
                .read
                .options(inferSchena = True)
                .json(base_path + "transactions.json")
                .withColumn("timestamp", F.expr("from_unixtime(cast(timestamp/pow(10, 9) as bigint))"))
               )
transactions.show()
transactions.printSchema()

+------------------+--------+------------+------------+------------+-------------------+
|            amount|category| customer_id|          id| merchant_id|          timestamp|
+------------------+--------+------------+------------+------------+-------------------+
| 802.7103986581193|     atm|800000002081|690000000000|DZ0000000085|2018-08-31 16:45:00|
|507.74628999676526|     web|800000005031|690000000001|DZ0000000058|2018-07-16 01:53:31|
| 606.4194934089878|     pos|800000003872|690000000002|DZ0000000063|2018-05-17 14:57:21|
| 319.4347823556573|     web|800000004311|690000000003|DZ0000000065|2018-08-07 19:28:49|
| 6.499356159099801|  mobile|800000001603|690000000004|DZ0000000061|2018-07-05 13:25:49|
|1202.5906039488204|     web|800000008644|690000000005|DZ0000000045|2018-09-24 08:59:37|
| 334.1742337330727|  mobile|800000009101|690000000006|DZ0000000023|2018-05-27 18:44:58|
|1276.2935468125615|     atm|800000003658|690000000007|DZ0000000003|2018-08-18 22:33:10|
|1014.0822024764368| 

In [14]:
(transactions
    .write
    .mode("overwrite")
    .format("org.apache.spark.sql.cassandra")
    .options(table = "transactions", keyspace = "cc")
    .save())

In [15]:
transactions = (spark
    .read
    .format("org.apache.spark.sql.cassandra")
    .options(table = "transactions", keyspace = "cc")
    .load())
transactions.limit(10).toPandas()

Unnamed: 0,customer_id,timestamp,id,amount,category,flag_ml,flag_threshold,location,location_id,merchant_id,overruled,overruled_comment,overruled_date,score
0,800000002686,2018-10-01 15:31:58,690000204866,2131.528888,web,,,,,DZ0000000022,,,,
1,800000002686,2018-09-28 09:00:29,690000628963,5158.127424,mobile,,,,,DZ0000000010,,,,
2,800000002686,2018-09-25 11:15:10,690000463846,2899.654655,mobile,,,,,DZ0000000003,,,,
3,800000002686,2018-09-25 00:54:22,690000257935,1753.912085,web,,,,,DZ0000000049,,,,
4,800000002686,2018-09-23 00:36:28,690000152367,1528.890501,pos,,,,,DZ0000000064,,,,
5,800000002686,2018-09-17 14:44:08,690000550928,3403.744026,web,,,,,DZ0000000094,,,,
6,800000002686,2018-09-16 15:56:52,690000176866,1851.144587,atm,,,,,DZ0000000083,,,,
7,800000002686,2018-09-16 15:00:32,690000922612,7191.043201,mobile,,,,,DZ0000000056,,,,
8,800000002686,2018-09-15 17:41:44,690000004149,980.288949,web,,,,,DZ0000000043,,,,
9,800000002686,2018-09-11 08:23:49,690000652705,4919.487656,web,,,,,DZ0000000083,,,,


In [16]:
agg = (transactions
    .groupBy("customer_id")
    .agg(F.avg("amount").alias("amount_avg"), F.stddev("amount").alias("amount_std"))
    .withColumn("amount_upper_threshold", F.expr("amount_avg + amount_std"))
    .withColumn("amount_lower_threshold", F.expr("amount_avg - amount_std"))
)

agg.show()

+------------+------------------+------------------+----------------------+----------------------+
| customer_id|        amount_avg|        amount_std|amount_upper_threshold|amount_lower_threshold|
+------------+------------------+------------------+----------------------+----------------------+
|800000003508|4000.8447911541843|2030.5769053432075|     6031.421696497392|    1970.2678858109768|
|800000002892|4015.4699165463403|1879.4048226518348|     5894.874739198175|    2136.0650938945055|
|800000006164|  4064.89169687135|1818.1123446674483|     5883.004041538798|     2246.779352203902|
|800000004876|4330.1780265902435|1915.2435772743434|     6245.421603864586|       2414.9344493159|
|800000003869|3424.0908568407763| 1755.596040364472|     5179.686897205248|    1668.4948164763043|
|800000004211|4046.3803344300945|1711.4403921994372|     5757.820726629532|    2334.9399422306574|
|800000004464| 4270.542190952598|2010.1792401752057|     6280.721431127803|     2260.362950777392|
|800000004

In [17]:
(agg
    .select("customer_id", "amount_upper_threshold", "amount_lower_threshold")
    .withColumnRenamed("customer_id", "id")
    .write
    .mode("append")
    .format("org.apache.spark.sql.cassandra")
    .options(table = "customer", keyspace = "cc")
    .save())

In [18]:
cass_table("customer").show()

+------------+-------+---+----------------------+----------------------+----------+--------------------+----------+------+-------------+
|          id|address|age|amount_lower_threshold|amount_upper_threshold|       dob|               email|first_name|gender|    last_name|
+------------+-------+---+----------------------+----------------------+----------+--------------------+----------+------+-------------+
|800000002686|   null| 79|    1873.6919950506053|     5750.826738407709|1939-03-15|clea.eckenbrecht@...|      Clea|     M|  Eckenbrecht|
|800000009014|   null| 22|    2036.0152859916593|     5845.483106360582|1996-03-29|deondrick.manopin...| Deondrick|     F|  Manopinives|
|800000003508|   null| 33|    1970.2678858109768|     6031.421696497392|1985-04-26|jchristophides@ms...|  Jazabell|     F|Christophides|
|800000009097|   null| 74|    1860.8318397319022|     5882.868594280499|1944-05-26|kiptyn.chato@hotm...|    Kiptyn|     F|        Chato|
|800000003132|   null| 76|    2023.022367

In [22]:
from cassandra.cluster import Cluster

In [23]:
cluster = Cluster(["localhost"])
cass = cluster.connect("cc")
cass.execute("select amount_lower_threshold from customer where id = %s", ('800000009260',)).one()

Row(amount_lower_threshold=2406.1891696538737)

In [24]:


rdd = sc.parallelize(['800000009260', '800000001652', '800000005063'])

def detect_anomalies(tnx):
    cluster = Cluster(["localhost"])
    cass = cluster.connect("cc")
    result = []
    for r in tnx:
        rec = cass.execute("select id, amount_lower_threshold from customer where id = %s"
                           , (r,)).one()
        result.append((rec.id, rec.amount_lower_threshold))
    cass.shutdown()
    cluster.shutdown()
    return result

rdd.mapPartitions(detect_anomalies).collect()

[('800000009260', 2406.1891696538737),
 ('800000001652', 1999.5693000247313),
 ('800000005063', 2129.9385858059727)]