In [1]:
# Author: iTrue
# Date: 2023-04-14
# Description: Scan WiFi SSIDs

import os
import pandas as pd
import subprocess

def get_wifi_networks():
    cmd = 'netsh wlan show networks mode=Bssid'
    output = subprocess.check_output(cmd, shell=True)
    output = output.decode('utf-8')
    networks = []
    network = {}
    for line in output.split('\n'):
        line = line.strip()
        if line.startswith('SSID'):
            if network:
                networks.append(network)
            network = {}
            ssid = line.split(':')[1].strip()
            network['wifiname'] = ssid
        elif line.startswith('BSSID'):
            bssid = line.split(':')[1].strip()
            network['bssid'] = bssid
        elif line.startswith('Authentication'):
            auth = line.split(':')[1].strip()
            network['authentication'] = auth
    if network:
        networks.append(network)
    df = pd.DataFrame(networks)
    return df

def scan_wifi():
    if os.name == 'nt':
        return get_wifi_networks()

    else:
        import network

        wlan = network.WLAN(network.STA_IF)
        wlan.active(True)
        nets = wlan.scan()
        ssids = [n[0].decode() for n in nets]
        return ssids
    
if __name__ == '__main__':
    # import pandas as pd
    # # Create an empty DataFrame
    # df = pd.DataFrame(columns=['wifi name', 'ssid'])
    # ssids = scan_wifi()

    # # Create a list of dictionaries for appending data
    # data = [{'wifi name': ssid[0], 'ssid': ssid[1]} for ssid in ssids]

    # # Concatenate the list of dictionaries to the DataFrame
    # df = pd.concat([df, pd.DataFrame(data)], ignore_index=True)

    # # Print the updated DataFrame
    # print(df)
    print(scan_wifi())


  wifiname authentication bssid
0    Triet  WPA2-Personal    80


In [None]:
# Author: iTrue
# Date: 2023-04-14
# Description: Connect to WiFi with Python known SSID and password
# !pip install network
import network

def connect_to_wifi(ssid, password):
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        print('connecting to network...')
        wlan.connect(ssid, password)
        while not wlan.isconnected():
            pass
    print('network config:', wlan.ifconfig())

Collecting network
  Downloading network-0.1.tar.gz (2.8 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: network
  Building wheel for network (setup.py) ... [?25ldone
[?25h  Created wheel for network: filename=network-0.1-py3-none-any.whl size=3138 sha256=16657ac64b4813cc65f69b40276d5f6399d65d50308c8999732aa347db88ab5b
  Stored in directory: /Users/trieupham/Library/Caches/pip/wheels/3a/9a/a4/341d3b109494a43a5cdd444ca83be3a4bfe8c1267ad9f85332
Successfully built network
Installing collected packages: network
Successfully installed network-0.1


In [7]:
# Author: iTrue
# Date: 2023-04-14
# Scan all devices on the network
# !pip install scapy
# !pip install requests

from scapy.all import ARP, Ether, srp
import requests
# create ARP packet
arp = ARP(pdst="192.168.1.0/24")
# create Ether broadcast packet
ether = Ether(dst="ff:ff:ff:ff:ff:ff")
# combine the two packets
packet = ether/arp

# send the packet and capture response
result = srp(packet, timeout=3, verbose=0)[0]

# parse the response
devices = []
for sent, received in result:
    device = {'ip': received.psrc, 'mac': received.hwsrc}
    try:
        device_name = requests.get(f"https://api.macvendors.com/{received.hwsrc}").text.strip()
    except:
        device_name = 'Unknown'
    device['device_name'] = device_name
    devices.append(device)

# print the devices
print("Available devices in the network:")
print("IP" + " "*18+"MAC"+" "*18+"Device Name")
for device in devices:
    print(f"{device['ip']}\t{device['mac']}\t{device['device_name']}")


Available devices in the network:
IP                  MAC                  Device Name
192.168.1.1	80:69:33:f6:8e:06	HUAWEI TECHNOLOGIES CO.,LTD
192.168.1.22	60:d9:c7:dd:0c:19	Apple, Inc.
192.168.1.34	a2:03:c2:f2:c2:81	{"errors":{"detail":"Not Found"}}
192.168.1.63	9c:b6:d0:ea:77:8d	Rivet Networks
192.168.1.49	c0:b5:d7:0e:c7:63	CHONGQING FUGUI ELECTRONICS CO.,LTD.
192.168.1.68	9a:52:0e:e8:90:2a	{"errors":{"detail":"Not Found"}}
192.168.1.94	9e:f4:0d:33:53:f3	{"errors":{"detail":"Not Found"}}


In [4]:
# Author: iTrue
# Date: 2023-04-14
# Description: Using nmap to scan open ports

!pip install python-nmap
import nmap

# initialize nmap scanner
nm = nmap.PortScanner()

# define the host to scan
target_host = "192.168.1.1"

# perform the scan
nm.scan(target_host, arguments="-sV -p-")

# loop through all the open ports
for port in nm[target_host]["tcp"]:
    if nm[target_host]["tcp"][port]["state"] == "open":
        print(f"Port {port} is open")



Port 53 is open
Port 80 is open
Port 443 is open
Port 27998 is open
Port 37443 is open
Port 37444 is open


In [3]:


from scapy.all import ARP, Ether, srp
import requests

# create ARP packet
arp = ARP(pdst="192.168.1.0/24")
# create Ether broadcast packet
ether = Ether(dst="ff:ff:ff:ff:ff:ff")
# combine the two packets
packet = ether/arp

# send the packet and capture response
result = srp(packet, timeout=3, verbose=0)[0]

# parse the response
devices = []
for sent, received in result:
    device = {'ip': received.psrc, 'mac': received.hwsrc}
    try:
        device_name = requests.get(f"https://api.macvendors.com/{received.hwsrc}").text.strip()
    except:
        device_name = 'Unknown'
    device['device_name'] = device_name
    devices.append(device)

# print the devices in the same format as arp -a command
print("Interface: en0")
for device in devices:
    print(f"{device['ip']}\t{device['mac']}\t{device['device_name']}")


Interface: en0
192.168.1.1	80:69:33:f6:8e:06	HUAWEI TECHNOLOGIES CO.,LTD
192.168.1.63	9c:b6:d0:ea:77:8d	Rivet Networks
192.168.1.68	9a:52:0e:e8:90:2a	{"errors":{"detail":"Not Found"}}


In [2]:
import csv
import random
from datetime import date

# List of CUST_IDs
cust_ids = [
    9300, 9317, 9318, 9180, 9181, 9182, 9183, 9192, 9193, 9194,
    9195, 9196, 9197, 9198, 9199, 9200, 9322, 55, 57, 58, 60, 62,
    64, 9273, 9282, 9283, 9281, 9284, 9285, 9265, 9287, 9308,
    9321, 9280, 9292, 9290, 9293
]

# Generate 100,000 records
num_records = 2000000

# Open the CSV file for writing
filename = r'C:\Users\trieu.pham\Downloads\customer_records_no_error.csv'
with open(filename, 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(['CUST_ID', 'AMOUNT', 'BUSINESS_DATE'])

    for _ in range(num_records):
        cust_id = random.choice(cust_ids)
        amount = round(random.uniform(1, 1000), 2)
        business_date = date(2023, 6, 6)
        writer.writerow([cust_id, amount, business_date])

print(f'{num_records} records generated and saved to {filename}.')


2000000 records generated and saved to C:\Users\trieu.pham\Downloads\customer_records_no_error.csv.


In [1]:
# Split a CSV file into multiple files based on a column value

import csv
import math

# Define the number of output files
num_files = 30

# Open the input CSV file
input_filename = r'C:\Users\trieu.pham\Downloads\nex_loyalty_files\customer_records_w_errors.csv'
with open(input_filename, 'r') as csvfile:
    reader = csv.reader(csvfile)
    header = next(reader)  # Skip the header row

    # Determine the number of records per file
    total_records = sum(1 for _ in reader)
    records_per_file = math.ceil(total_records / num_files)

    # Reset the reader to the beginning of the file
    csvfile.seek(0)
    next(reader)  # Skip the header row again

    # Iterate over the output files
    for file_index in range(num_files):
        # Generate the output filename
        output_filename = f'output_w_errors_{file_index + 1}.csv'

        # Open the output CSV file
        with open(output_filename, 'w', newline='') as outfile:
            writer = csv.writer(outfile)
            writer.writerow(header)  # Write the header row

            # Write the records for the current file
            for _ in range(records_per_file):
                try:
                    row = next(reader)
                    writer.writerow(row)
                except StopIteration:
                    break

        print(f'{records_per_file} records written to {output_filename}.')

print(f'{total_records} records have been split into {num_files} files.')


66667 records written to output_w_errors_1.csv.
66667 records written to output_w_errors_2.csv.
66667 records written to output_w_errors_3.csv.
66667 records written to output_w_errors_4.csv.
66667 records written to output_w_errors_5.csv.
66667 records written to output_w_errors_6.csv.
66667 records written to output_w_errors_7.csv.
66667 records written to output_w_errors_8.csv.
66667 records written to output_w_errors_9.csv.
66667 records written to output_w_errors_10.csv.
66667 records written to output_w_errors_11.csv.
66667 records written to output_w_errors_12.csv.
66667 records written to output_w_errors_13.csv.
66667 records written to output_w_errors_14.csv.
66667 records written to output_w_errors_15.csv.
66667 records written to output_w_errors_16.csv.
66667 records written to output_w_errors_17.csv.
66667 records written to output_w_errors_18.csv.
66667 records written to output_w_errors_19.csv.
66667 records written to output_w_errors_20.csv.
66667 records written to outp

In [1]:
import csv
import random
from datetime import date

# List of CUST_IDs
cust_ids = [
    '9300', '9317', '9318', '9180', '9181', '9182', '9183', '9192', '9193', '9194',
    '9195', '9196', '9197', '9198', '9199', '9200', '9322', '55', '57', '58', '60', '62',
    '64', '9273', '9282', '9283', '9281', '9284', '9285', '9265', '9287', '9308',
    '9321', '9280', '9292', '9290', '9293', 'AB123', 'XYZ456', '789DEF'
]

# Generate 2 million records
num_records = 2000000

# Open the CSV file for writing
filename = r'C:\Users\trieu.pham\Downloads\customer_records.csv'
with open(filename, 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(['CUST_ID', 'AMOUNT', 'BUSINESS_DATE'])

    for _ in range(num_records):
        # Generate random CUST_ID (including characters)
        if random.randint(1, 10) == 1:  # 10% chance of incorrect CUST_ID
            cust_id = random.choice(cust_ids)  # Random correct CUST_ID
        else:
            cust_id = ''.join(random.choices(cust_ids, k=random.randint(6, 10)))  # Random incorrect CUST_ID with characters

        # Generate random amount (including incorrect values)
        if random.randint(1, 10) == 1:  # 10% chance of incorrect amount
            amount = round(random.uniform(1001, 9999), 2)  # Random incorrect amount
        else:
            amount = round(random.uniform(1, 1000), 2)  # Random correct amount

        # Generate random BUSINESS_DATE format
        formats = ['%Y/%m/%d', '%Y-%m-%d', '%y-%m-%d', '%Y-%d-%m', '%d/%m/%Y']
        business_date = date.strftime(date.today(), random.choice(formats))

        writer.writerow([cust_id, amount, business_date])

print(f'{num_records} records generated and saved to {filename}.')


2000000 records generated and saved to C:\Users\trieu.pham\Downloads\customer_records.csv.


In [3]:
# Create xml file
import xml.etree.ElementTree as ET

# Create the root element
root = ET.Element("Loyalty")

# Define the structure of a single LoyaltyRequest
loyalty_request_structure = {
    "Type": "IssuePoints",
    "Instrument": {
        "CardNumber": "8100050805433334",
        "InstrumentID": ""
    },
    "ClientUserId": "",
    "ClientComments": "",
    "ReasonCode": "",
    "IssuePointsParameters": {
        "LoyaltyAccountID": "37983",
        "IssueRuleID": "",
        "PointsAmount": "1",
        "EscrowPeriodTypeCode": "",
        "EscrowPeriods": "",
        "BonusPointsFlag": "false"
    },
    "TransactionID": {
        "RetailStoreID": "9999",
        "RegisterID": "",
        "BusinessDayDate": "2023-06-06",
        "SequenceNumber": "0"
    }
}

# Create the LoyaltyRequest elements
for _ in range(100000):
    loyalty_request = ET.SubElement(root, "LoyaltyRequest")
    loyalty_request.set("Type", loyalty_request_structure["Type"])

    instrument = ET.SubElement(loyalty_request, "Instrument")
    card_number = ET.SubElement(instrument, "CardNumber")
    card_number.text = loyalty_request_structure["Instrument"]["CardNumber"]
    instrument_id = ET.SubElement(instrument, "InstrumentID")

    client_user_id = ET.SubElement(loyalty_request, "ClientUserId")
    client_comments = ET.SubElement(loyalty_request, "ClientComments")
    reason_code = ET.SubElement(loyalty_request, "ReasonCode")

    issue_points_parameters = ET.SubElement(loyalty_request, "IssuePointsParameters")
    loyalty_account_id = ET.SubElement(issue_points_parameters, "LoyaltyAccountID")
    loyalty_account_id.text = loyalty_request_structure["IssuePointsParameters"]["LoyaltyAccountID"]
    issue_rule_id = ET.SubElement(issue_points_parameters, "IssueRuleID")
    points_amount = ET.SubElement(issue_points_parameters, "PointsAmount")
    points_amount.text = loyalty_request_structure["IssuePointsParameters"]["PointsAmount"]
    escrow_period_type_code = ET.SubElement(issue_points_parameters, "EscrowPeriodTypeCode")
    escrow_periods = ET.SubElement(issue_points_parameters, "EscrowPeriods")
    bonus_points_flag = ET.SubElement(issue_points_parameters, "BonusPointsFlag")
    bonus_points_flag.text = loyalty_request_structure["IssuePointsParameters"]["BonusPointsFlag"]

    transaction_id = ET.SubElement(loyalty_request, "TransactionID")
    retail_store_id = ET.SubElement(transaction_id, "RetailStoreID")
    retail_store_id.text = loyalty_request_structure["TransactionID"]["RetailStoreID"]
    register_id = ET.SubElement(transaction_id, "RegisterID")
    business_day_date = ET.SubElement(transaction_id, "BusinessDayDate")
    business_day_date.text = loyalty_request_structure["TransactionID"]["BusinessDayDate"]
    sequence_number = ET.SubElement(transaction_id, "SequenceNumber")
    sequence_number.text = loyalty_request_structure["TransactionID"]["SequenceNumber"]

# Create the main root element
main_root = ET.ElementTree(root)

# Save the XML to a file
xml_files = r'C:\Users\trieu.pham\Downloads\cloned_records.xml'
main_root.write(xml_files, encoding="utf-8", xml_declaration=True)

print("XML file with cloned records created successfully.")


XML file with cloned records created successfully.
