In [2]:
import numpy as np

arr = np.array((1, 2, 3, 4, 5))

print(arr)

[1 2 3 4 5]


In [3]:
import numpy as np

arr = np.array(42)

print(arr)

42


In [4]:
import numpy as np

arr = np.array([1, 2, 3, 4, 5])

print(arr)

[1 2 3 4 5]


In [5]:
import numpy as np

arr = np.array([[1, 2, 3], [4, 5, 6]])

print(arr)

[[1 2 3]
 [4 5 6]]


In [6]:
import numpy as np

arr = np.array([[[1, 2, 3], [4, 5, 6]], [[1, 2, 3], [4, 5, 6]]])

print(arr)

[[[1 2 3]
  [4 5 6]]

 [[1 2 3]
  [4 5 6]]]


In [7]:
a = np.array(42)
b = np.array([1, 2, 3, 4, 5])
c = np.array([[1, 2, 3], [4, 5, 6]])
d = np.array([[[1, 2, 3], [4, 5, 6]], [[1, 2, 3], [4, 5, 6]]])

print(a.ndim)
print(b.ndim)
print(c.ndim)
print(d.ndim)

0
1
2
3


In [8]:
arr = np.array([1, 2, 3, 4], ndmin=5)

print(arr)
print('number of dimensions :', arr.ndim)

[[[[[1 2 3 4]]]]]
number of dimensions : 5


In [9]:
arr = np.array([1, 2, 3, 4])

print(arr[0])

1


In [10]:

arr = np.array([1, 2, 3, 4])

print(arr[1])

2


In [11]:

arr = np.array([1, 2, 3, 4])

print(arr[2] + arr[3])

7


In [12]:
arr = np.array([[1,2,3,4,5], [6,7,8,9,10]])

print('2nd element on 1st row: ', arr[0, 1])

2nd element on 1st row:  2


In [13]:
import pandas

mydataset = {
  'cars': ["BMW", "Volvo", "Ford"],
  'passings': [3, 7, 2]
}

myvar = pandas.DataFrame(mydataset)

print(myvar)

    cars  passings
0    BMW         3
1  Volvo         7
2   Ford         2


In [15]:
import pandas as pd

a = [1, 7, 2]

myvar = pd.Series(a)

print(myvar)

0    1
1    7
2    2
dtype: int64


In [16]:
import pandas as pd

calories = {"day1": 420, "day2": 380, "day3": 390}

myvar = pd.Series(calories)

print(myvar)

day1    420
day2    380
day3    390
dtype: int64


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
import mysql.connector
import os  # Import the os module
import socket # Import socket

# Initialize Spark session
spark = SparkSession.builder \
    .appName("CreditCardCapstoneETL") \
    .config("spark.driver.extraClassPath", "/path/to/mysql-connector-java.jar") \
    .getOrCreate()

# Get the absolute paths to the JSON files
branch_file_path = "cdw_sapp_branch.json"
customer_file_path = "cdw_sapp_customer.json"
credit_file_path = "cdw_sapp_credit.json"

branch_absolute_path = os.path.abspath(branch_file_path)
customer_absolute_path = os.path.abspath(customer_file_path)
credit_absolute_path = os.path.abspath(credit_file_path)


# Read the JSON files into DataFrames
try:
    branch_df = spark.read.json(branch_absolute_path)
    customer_df = spark.read.json(customer_absolute_path)
    credit_df = spark.read.json(credit_absolute_path)
except Exception as e:
    print(f"Error reading JSON file: {e}")
    spark.stop()
    exit(1)

# Define MySQL connection parameters
host = "127.0.0.1"  # Replace with your MySQL host.  Fixed the unterminated string and removed the port.
user = "root"  # Replace with your MySQL user
password = "password"  # Replace with your MySQL password
database = "creditcard_capstone"  # Replace with your MySQL database

# Function to create a MySQL connection
def create_mysql_connection():
    try:
        connection = mysql.connector.connect(
            host=host,
            user=user,
            password=password,
            database=database
        )
        return connection
    except mysql.connector.Error as e:
        print(f"Error connecting to MySQL: {e}")
        return None

# Function to create tables in MySQL
def create_tables(connection):
    if connection is None:
        return

    cursor = connection.cursor()
    try:
        cursor.execute("CREATE TABLE IF NOT EXISTS CDW_SAPP_CUSTOMER (CUST_ID INT PRIMARY KEY, FNAME VARCHAR(255), MNAME VARCHAR(255), LNAME VARCHAR(255), STREET VARCHAR(255), CITY VARCHAR(255), STATE VARCHAR(255), ZIP INT, PHONE VARCHAR(20), EMAIL VARCHAR(255), GENDER VARCHAR(10), DOB DATE)")
        cursor.execute("CREATE TABLE IF NOT EXISTS CDW_SAPP_BRANCH (BRANCH_ID INT PRIMARY KEY, BRANCH_NAME VARCHAR(255), BRANCH_CITY VARCHAR(255), BRANCH_STATE VARCHAR(255))")
        connection.commit()
        print("Tables created successfully.")
    except mysql.connector.Error as e:
        print(f"Error creating tables: {e}")
        connection.rollback()
    finally:
        cursor.close()

# Function to insert data into MySQL tables
def insert_data(connection, customer_df, branch_df):
    if connection is None:
        return

    cursor = connection.cursor()
    try:
        # Insert data into CDW_SAPP_CUSTOMER
        for row in customer_df.collect():
            sql = "INSERT INTO CDW_SAPP_CUSTOMER (CUST_ID, FNAME, MNAME, LNAME, STREET, CITY, STATE, ZIP, PHONE, EMAIL, GENDER, DOB) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
            values = (row["cust_id"], row["fname"], row["mname"], row["lname"], row["street"], row["city"], row["state"], row["zip"], row["phone"], row["email"], row["gender"], row["dob"])
            cursor.execute(sql, values)

        # Insert data into CDW_SAPP_BRANCH
        for row in branch_df.collect():
            sql = "INSERT INTO CDW_SAPP_BRANCH (BRANCH_ID, BRANCH_NAME, BRANCH_CITY, BRANCH_STATE) VALUES (%s, %s, %s, %s)"
            values = (row["branch_id"], row["branch_name"], row["branch_city"], row["branch_state"])
            cursor.execute(sql, values)

        connection.commit()
        print("Data inserted successfully.")
    except mysql.connector.Error as e:
        print(f"Error inserting data: {e}")
        connection.rollback()
    finally:
        cursor.close()

# Main function to orchestrate the process
def main():
    # Create a MySQL connection
    mysql_connection = create_mysql_connection()
    if mysql_connection:
        # Create tables in MySQL
        create_tables(mysql_connection)
        # Insert data into the tables
        insert_data(mysql_connection, customer_df, branch_df)
        # Close the connection
        mysql_connection.close()
        print("MySQL connection closed.")
    else:
        print("Failed to establish MySQL connection.")
    spark.stop()

if __name__ == "__main__":
    main()


Tables created successfully.


AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).csv(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().

: 