In [1]:
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession
load_dotenv()

# Initialize a Spark session
spark = (SparkSession.builder
    .appName("Snowflake to PySpark")
    # .config("spark.jars.packages", "net.snowflake:spark-snowflake_2.12:2.10.0-spark_3.2") 
    .master("local[*]")
    .getOrCreate())

# Display the Spark session 
spark

In [None]:
# Snowflake connection options
sfOptions = {
    "sfURL": f"https://{os.getenv('ACCOUNT')}.snowflakecomputing.com",
    "sfUser": os.getenv('USER_NAME'),
    "sfPassword": os.getenv('PASSWORD'),
    "sfDatabase": "TEST",
    "sfSchema": "TEST",
    "sfWarehouse": "compute_wh",
    "sfRole": "ACCOUNTADMIN"
}

# Read the data from Snowflake table into a DataFrame
df = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**sfOptions) \
    .option("dbtable", "TEST.TEST.CREDIT_CARD") \
    .load()

# Register the DataFrame as a temporary SQL view for processing
df.createOrReplaceTempView("CREDIT_CARD")


In [4]:
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+--------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+------------------+-------------------+------------------+-------------------+-------------------+------------------+-------------------+------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------+-----+
|    TIME|                 V1|                 V2|                V3|                 V4|                V5|                V6|                V7|                 V8|                V9|                V10|               V11|                V12|               V13|                V14|                V15|               V16|                V17|               V18|                V19|                V20|                 V21|  

                                                                                

In [5]:
df.count()

                                                                                

313287

In [6]:
df.write.format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", "TEST.TEST.CREDIT_CARD_V2") \
    .mode("overwrite") \
    .save()

24/08/10 20:38:31 WARN SnowflakeConnectorUtils$: Query pushdown is not supported because you are using Spark 3.0.1 with a connector designed to support Spark 3.2. Either use the version of Spark supported by the connector or install a version of the connector that supports your version of Spark.
24/08/10 20:39:05 WARN SnowflakeConnectorUtils$: Query pushdown is not supported because you are using Spark 3.0.1 with a connector designed to support Spark 3.2. Either use the version of Spark supported by the connector or install a version of the connector that supports your version of Spark.


In [7]:
spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**sfOptions) \
    .option("dbtable", "TEST.TEST.CREDIT_CARD_V2") \
    .load().count()

24/08/10 20:39:07 WARN SnowflakeConnectorUtils$: Query pushdown is not supported because you are using Spark 3.0.1 with a connector designed to support Spark 3.2. Either use the version of Spark supported by the connector or install a version of the connector that supports your version of Spark.
                                                                                

313287

In [2]:
df = spark.read.csv(r"C:\Users\durai\Downloads\LI-Small_Trans.csv\LI-Small_Trans.csv",header=True)
df.cache().count()

6924049

In [12]:

df.select(substring("timestamp", 1, 10).alias('date')).groupBy('date').agg(count('*')).show(1000,False)

+----------+--------+
|date      |count(1)|
+----------+--------+
|2022/09/02|1027758 |
|2022/09/03|283326  |
|2022/09/06|657170  |
|2022/09/05|657397  |
|2022/09/01|1524807 |
|2022/09/04|282476  |
|2022/09/10|282877  |
|2022/09/08|657938  |
|2022/09/09|891573  |
|2022/09/07|658504  |
|2022/09/11|77      |
|2022/09/13|48      |
|2022/09/12|48      |
|2022/09/15|9       |
|2022/09/14|31      |
|2022/09/16|7       |
|2022/09/17|3       |
+----------+--------+



In [16]:
(
    df
    .withColumn('date',substring("timestamp", 1, 10))
    .filter(col('date') < '2022/09/10')
    .groupBy('date')
    .agg(count('*'))
    .sort('date')
    .show(1000,False)
)

+----------+--------+
|date      |count(1)|
+----------+--------+
|2022/09/01|1524807 |
|2022/09/02|1027758 |
|2022/09/03|283326  |
|2022/09/04|282476  |
|2022/09/05|657397  |
|2022/09/06|657170  |
|2022/09/07|658504  |
|2022/09/08|657938  |
|2022/09/09|891573  |
+----------+--------+



In [22]:
df.filter(col('Is Laundering') == 1).show(truncate=False)
df.filter(col('Is Laundering') == 1).count() 

+----------------+---------+---------+-------+---------+---------------+------------------+------------+----------------+--------------+-------------+
|Timestamp       |From Bank|Account2 |To Bank|Account4 |Amount Received|Receiving Currency|Amount Paid |Payment Currency|Payment Format|Is Laundering|
+----------------+---------+---------+-------+---------+---------------+------------------+------------+----------------+--------------+-------------+
|2022/09/01 00:02|070      |10042B660|022661 |805F7F2B0|70831.64       |US Dollar         |70831.64    |US Dollar       |Cash          |1            |
|2022/09/01 00:19|070      |10042B660|0212996|80609B4C0|33705.47       |US Dollar         |33705.47    |US Dollar       |Cash          |1            |
|2022/09/01 00:01|070      |10042B660|0011305|807861770|1097976.27     |US Dollar         |1097976.27  |US Dollar       |Cash          |1            |
|2022/09/01 00:00|011968   |815630C40|0249349|815635220|892.33         |US Dollar         |892

3565

In [21]:
(
    df
    .withColumn('date',substring("timestamp", 1, 10))
    .filter((col('date') < '2022/09/10') & (col('Is Laundering') == 1))
    .groupBy('date')
    .agg(count('*'))
    .sort('date')
    .show(1000,False)
)

+----------+--------+
|date      |count(1)|
+----------+--------+
|2022/09/01|310     |
|2022/09/02|350     |
|2022/09/03|292     |
|2022/09/04|291     |
|2022/09/05|370     |
|2022/09/06|380     |
|2022/09/07|368     |
|2022/09/08|402     |
|2022/09/09|367     |
+----------+--------+



In [22]:
# load_dt, tran_dt, load_id
spark.conf.set("fs.defaultFS", "file:///")
spark.conf.set("spark.hadoop.io.native.lib.available", "false")
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from pyspark.sql.functions import col,substring,count
(
    df
    .na.fill('')
    .coalesce(1)
    .filter((substring("timestamp", 1, 10) <= '2022/09/03'))
    .write
    .option("header","true")
    .format("csv")
    .save(r"data\transactions\transactions_load_1")
)


#merge resolve

Py4JJavaError: An error occurred while calling o230.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:278)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1218)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1423)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$25(FileFormatWriter.scala:267)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:642)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:267)
	... 41 more
