In [11]:
import findspark
findspark.init("/opt/spark")

In [16]:
!hdfs dfs -mkdir /tmp/hdfs-cli-example2

mkdir: `/tmp/hdfs-cli-example2': File exists


In [17]:
!hdfs dfs -put bank_transactions_data_2.csv /tmp/hdfs-cli-example2/

In [18]:
! hdfs dfs -ls /tmp/hdfs-cli-example2

Found 1 items
-rw-r--r--   2 jupyter supergroup     344980 2024-12-04 12:34 /tmp/hdfs-cli-example2/bank_transactions_data_2.csv


In [19]:
from pyspark.sql import SparkSession

In [20]:
spark = SparkSession.builder\
.enableHiveSupport()\
.getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [48]:
spark.sql(""" 
CREATE EXTERNAL TABLE transactions (
    TransactionID STRING,
    AccountID STRING,
    TransactionAmount DECIMAL(10,2),
    TransactionDate TIMESTAMP,
    TransactionType STRING,
    Location STRING,
    DeviceID STRING,
    IP_Address STRING,
    MerchantID STRING,
    Channel STRING,
    CustomerAge INT,
    CustomerOccupation STRING,
    TransactionDuration INT,
    LoginAttempts INT,
    AccountBalance DECIMAL(10,2),
    PreviousTransactionDate TIMESTAMP
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    "separatorChar" = ","
)
STORED AS TEXTFILE
LOCATION "/tmp/hdfs-cli-example2/"
TBLPROPERTIES ("skip.header.line.count" = "1");


""")

DataFrame[]

In [52]:
!hdfs dfs -mkdir /tmp/dz2/transactions_part

In [53]:
spark.sql(""" 
CREATE TABLE transactions_partitioned (
    TransactionID STRING,
    AccountID STRING,
    TransactionAmount DECIMAL(10,2),
    TransactionDate TIMESTAMP,
    TransactionType STRING,
    Location STRING,
    DeviceID STRING,
    IP_Address STRING,
    MerchantID STRING,
    Channel STRING,
    CustomerAge INT,
    CustomerOccupation STRING,
    TransactionDuration INT,
    LoginAttempts INT,
    AccountBalance DECIMAL(10,2),
    PreviousTransactionDate TIMESTAMP
)
PARTITIONED BY (year INT)
STORED AS TEXTFILE
LOCATION '/tmp/dz2/transactions_part';

""")

DataFrame[]

In [60]:
spark.sql(""" 
INSERT INTO TABLE transactions_partitioned PARTITION (year = 2023)
SELECT 
    TransactionID,
    AccountID,
    CAST(TransactionAmount AS DECIMAL(10,2)) AS TransactionAmount,
    CAST(TransactionDate AS TIMESTAMP) AS TransactionDate,
    TransactionType,
    Location,
    DeviceID,
    IP_Address,
    MerchantID,
    Channel,
    CAST(CustomerAge AS INT) AS CustomerAge,
    CustomerOccupation,
    CAST(TransactionDuration AS INT) AS TransactionDuration,
    CAST(LoginAttempts AS INT) AS LoginAttempts,
    CAST(AccountBalance AS DECIMAL(10,2)) AS AccountBalance,
    CAST(PreviousTransactionDate AS TIMESTAMP) AS PreviousTransactionDate
FROM transactions
WHERE YEAR(CAST(TransactionDate AS TIMESTAMP)) = 2023
""")

24/12/05 06:10:26 WARN HiveExternalCatalog: The table schema given by Hive metastore(struct<transactionid:string,accountid:string,transactionamount:string,transactiondate:string,transactiontype:string,location:string,deviceid:string,ip_address:string,merchantid:string,channel:string,customerage:string,customeroccupation:string,transactionduration:string,loginattempts:string,accountbalance:string,previoustransactiondate:string>) is different from the schema when this table was created by Spark SQL(struct<TransactionID:string,AccountID:string,TransactionAmount:decimal(10,2),TransactionDate:timestamp,TransactionType:string,Location:string,DeviceID:string,IP_Address:string,MerchantID:string,Channel:string,CustomerAge:int,CustomerOccupation:string,TransactionDuration:int,LoginAttempts:int,AccountBalance:decimal(10,2),PreviousTransactionDate:timestamp>). We have to fall back to the table schema from Hive metastore which is not case preserving.
                                                

DataFrame[]

In [61]:
spark.sql(""" 
INSERT INTO TABLE transactions_partitioned PARTITION (year = 2024)
SELECT 
    TransactionID,
    AccountID,
    CAST(TransactionAmount AS DECIMAL(10,2)) AS TransactionAmount,
    CAST(TransactionDate AS TIMESTAMP) AS TransactionDate,
    TransactionType,
    Location,
    DeviceID,
    IP_Address,
    MerchantID,
    Channel,
    CAST(CustomerAge AS INT) AS CustomerAge,
    CustomerOccupation,
    CAST(TransactionDuration AS INT) AS TransactionDuration,
    CAST(LoginAttempts AS INT) AS LoginAttempts,
    CAST(AccountBalance AS DECIMAL(10,2)) AS AccountBalance,
    CAST(PreviousTransactionDate AS TIMESTAMP) AS PreviousTransactionDate
FROM transactions
WHERE YEAR(CAST(TransactionDate AS TIMESTAMP)) = 2024
""")

24/12/05 06:10:42 WARN HiveExternalCatalog: The table schema given by Hive metastore(struct<transactionid:string,accountid:string,transactionamount:string,transactiondate:string,transactiontype:string,location:string,deviceid:string,ip_address:string,merchantid:string,channel:string,customerage:string,customeroccupation:string,transactionduration:string,loginattempts:string,accountbalance:string,previoustransactiondate:string>) is different from the schema when this table was created by Spark SQL(struct<TransactionID:string,AccountID:string,TransactionAmount:decimal(10,2),TransactionDate:timestamp,TransactionType:string,Location:string,DeviceID:string,IP_Address:string,MerchantID:string,Channel:string,CustomerAge:int,CustomerOccupation:string,TransactionDuration:int,LoginAttempts:int,AccountBalance:decimal(10,2),PreviousTransactionDate:timestamp>). We have to fall back to the table schema from Hive metastore which is not case preserving.


DataFrame[]

In [63]:
spark.sql(""" 
SELECT 
    TransactionID ,
    AccountID ,
    TransactionAmount ,
    TransactionDate ,
    TransactionType ,
    Location ,
    DeviceID ,
    IP_Address ,
    MerchantID ,
    Channel ,
    CustomerAge ,
    CustomerOccupation ,
    TransactionDuration ,
    LoginAttempts ,
    AccountBalance ,
    PreviousTransactionDate
FROM transactions_partitioned
""").show(100)

+-------------+---------+-----------------+-------------------+---------------+--------------+--------+---------------+----------+-------+-----------+------------------+-------------------+-------------+--------------+-----------------------+
|TransactionID|AccountID|TransactionAmount|    TransactionDate|TransactionType|      Location|DeviceID|     IP_Address|MerchantID|Channel|CustomerAge|CustomerOccupation|TransactionDuration|LoginAttempts|AccountBalance|PreviousTransactionDate|
+-------------+---------+-----------------+-------------------+---------------+--------------+--------+---------------+----------+-------+-----------+------------------+-------------------+-------------+--------------+-----------------------+
|     TX000001|  AC00128|            14.09|2023-04-11 16:29:14|          Debit|     San Diego| D000380| 162.198.218.92|      M015|    ATM|         70|            Doctor|                 81|            1|       5112.21|    2024-11-04 08:08:08|
|     TX000002|  AC00455|   