<a href="https://colab.research.google.com/github/JohnRTurner/JohnRTurner.github.io/blob/master/Benford%E2%80%99s_Law.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Prerequisites

In [10]:
#@title Prerequisites
#@markdown **Run first!** This section will pre-load all required resources and setup global parameters
!pip install pandas pymysql sqlalchemy singlestoredb pyspark --quiet

import pymysql
import pandas as pd
from pandas.io import sql
from sqlalchemy import Column, VARCHAR
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
import sqlalchemy
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
# Import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import max


from google.colab import data_table
data_table.enable_dataframe_formatter()


#Database Settings
UserName='admin'
Password='S1ngleSt0r31'
DatabaseName='order_mgt'
URL='svc-f30b238f-f674-4507-ba11-e65b95c4bc88-dml.aws-oregon-2.svc.singlestore.com:3306'

# Creating the database connection
db_connection_str = "mysql+pymysql://" + UserName + ":" + Password + "@"+URL + "/" + DatabaseName
db_connection = create_engine(db_connection_str)


#setup Spark
spark = (SparkSession.builder
         .master("local[*]")
         .config('spark.jars.packages','com.singlestore:singlestore-spark-connector_2.12:4.1.1-spark-3.3.0,ch.cern.sparkmeasure:spark-measure_2.12:0.21')
         .getOrCreate())

spark.conf.set("spark.datasource.singlestore.ddlEndpoint", URL)
spark.conf.set("spark.datasource.singlestore.dmlEndpoints", URL)
spark.conf.set("spark.datasource.singlestore.user", UserName)
spark.conf.set("spark.datasource.singlestore.password", Password)
spark.conf.set("spark.datasource.singlestore.database", DatabaseName)

# Benford's Law

In [11]:
#@title Benford's Law
#@markdown **Calculate Benford's Law in SingleStore!** Display Graphs and Table Below

DataFromDB = pd.read_sql('WITH DIGITS_CTE(FirstDigit, ActualOcurrance, Total, PCT, AVG, STD ) AS ' +   
  '(SELECT SUBSTRING(ABS(l_extendedprice),1,1) AS FirstDigit, ' +
  '        COUNT(*) ActualOcurrance, SUM(COUNT(*)) OVER () Total, ' +
  '        100.0 * COUNT(*) / SUM(COUNT(*)) OVER () AS PCT, ' +
  '        AVG(COUNT(*)) OVER () AS AVG,  STD(COUNT(*)) OVER () AS STD ' +
  '  FROM lineitem AS sd ' +   
  ' WHERE ABS(sd.l_extendedprice) > 1 ' +
  ' GROUP BY SUBSTRING(ABS(l_extendedprice),1,1)) ' +
  'SELECT FirstDigit, ActualOcurrance, PCT as BenfordsLawCalc, ' +
  '        CASE FirstDigit  WHEN 0 THEN 0.0 ELSE 100 * (Log(1.0 + (1.0 / FirstDigit)) / Log(10)) END AS BenfordsLawStd, ' +
  '        (ActualOcurrance-AVG)/STD AS Z ' +
  ' FROM DIGITS_CTE AS ResultsTable ' +
  ' ORDER BY FirstDigit ', con=db_connection)

fig = px.line(DataFromDB, x = "FirstDigit", y = ["BenfordsLawCalc","BenfordsLawStd"], 
              labels={
                     "variable": "", "value": "Percentage",
                     "FirstDigit": "First Digit of Extended Price" },
              title="Audit Check of LineItem Extended Price Benford's Law")
fig2 = px.line(DataFromDB, x = "FirstDigit", y = "Z", 
              labels={ "FirstDigit": "First Digit of Extended Price" },
              title="Audit Check of LineItem Extended Price Benford's Law Z")
fig.show()
#fig2.show()

DataFromDB


Unnamed: 0,FirstDigit,ActualOcurrance,BenfordsLawCalc,BenfordsLawStd,Z
0,1,85612121,15.88829,30.103,1.004875
1,2,80513973,14.94215,17.609126,0.805856
2,3,83576230,15.51046,12.493874,0.925399
3,4,81063877,15.0442,9.691001,0.827323
4,5,73182362,13.58152,7.918125,0.519648
5,6,54442983,10.10378,6.694679,-0.211892
6,7,40015979,7.42635,5.799195,-0.775086
7,8,25689057,4.76749,5.115252,-1.334374
8,9,14741320,2.73576,4.575749,-1.761747


# Spark

In [12]:
#@title Spark Database Test
#@markdown **Test Spark with database**

spark.conf.set("spark.datasource.singlestore.enableParallelRead", False)
spark.conf.set("spark.datasource.singlestore.disablePushdown", False)

df = spark.read.format("singlestore").load("supplier")
#df = spark.read.format("singlestore") \
#     .option("query", "select * from supplier") \
#     .load().limit(10)
df.printSchema()
df.show()
print("Count: ", df.count())

maxBal = df.agg(max("s_acctbal")).collect()[0][0]
print("maxBal: ", maxBal)

root
 |-- s_suppkey: integer (nullable = true)
 |-- s_name: string (nullable = true)
 |-- s_address: string (nullable = true)
 |-- s_nationkey: integer (nullable = true)
 |-- s_phone: string (nullable = true)
 |-- s_acctbal: decimal(15,2) (nullable = true)
 |-- s_comment: string (nullable = true)

+---------+------------------+--------------------+-----------+---------------+---------+--------------------+
|s_suppkey|            s_name|           s_address|s_nationkey|        s_phone|s_acctbal|           s_comment|
+---------+------------------+--------------------+-----------+---------------+---------+--------------------+
|     5700|Supplier#000005700|LsNhljBonNCGLXqM2...|         18|28-416-627-2570|  7019.71|sly bold deposits...|
|     9979|Supplier#000009979| Ote0EB9LmVAeCZHyK K|          0|10-250-898-7175|  6390.95|ts cajole furious...|
|    11636|Supplier#000011636|wNnrkPkb4wTu3VDth...|         20|30-431-361-9154|  7068.64|ng ideas doze sly...|
|    14869|Supplier#000014869|jvCvS

# Other

In [None]:
#@title Time Series CandleStick
#@markdown **Calculate CandleStick from Time Series** Display Graph and Table Below

DataFromDB = pd.read_sql("SELECT TIME_BUCKET('14s',time_stamp) as Date, " + 
  "vin_number, " + 
  "first(engine_rpm, time_stamp) as Open, " + 
  "MIN(engine_rpm) as Low, MAX(engine_rpm) as High, " + 
  "last(engine_rpm, time_stamp) as Close  " + 
  "FROM vehicle.vehicle_time_series  " + 
  #"WHERE date between '2020-07-01' and '2021-10-01'  " + 
  #"WHERE date between '2021-09-01' and '2021-10-01'  " + 
  "WHERE date between '2021-09-03' and '2021-9-04'  " + 
  "  AND   vin_number = 'E591709D182B499E8'  " + 
  "GROUP BY 1, 2 ORDER BY 1 limit 10000", con=db_connection)


fig = go.Figure(data=[go.Candlestick(x=DataFromDB['Date'],
                open=DataFromDB['Open'],
                high=DataFromDB['High'],
                low=DataFromDB['Low'],
                close=DataFromDB['Close'])])

fig.show()

DataFromDB
