# XCP

> flashing interface for XCP
> For now, only support XCP on CAN via Kvaser CAN interface

In [None]:
#| default_exp xcp

In [None]:
#| hide
from nbdev.showdoc import show_doc
from fastcore.test import *
import platform 

In [None]:
#| export
import os
import git
import argparse
from InquirerPy import inquirer
from InquirerPy.validator import EmptyInputValidator
from InquirerPy.base.control import Choice
from pydantic import ValidationError
from pprint import pprint, PrettyPrinter

In [None]:
#| export
import subprocess
from multiprocessing import Manager
from multiprocessing.managers import DictProxy
import cantools
from cantools.database import Message as MessagerTpl
from cantools.database.can.database import Database

In [None]:
#| export
import pandas as pd
import numpy as np
import struct

In [None]:
# #|export
# from candycan.data_link_socketcan import done, send_msg

In [None]:
#| export 
from candycan.a2l import (
    list_of_strings,
    XCPCalib,
    XCPData,
    XCPConfig,
    Get_XCPCalib_From_XCPJSon,
    Generate_Init_XCPData_From_A2L,
)


In [None]:
#| hide
# running on a physical linux machine (blue pill), otherwise red pill
blue_pill = False if os.getenv("GITHUB_ACTIONS") else True

In [None]:
#| export
pp = PrettyPrinter(indent=4, width=80, compact=True)

In [None]:
#| export 
repo = git.Repo("./", search_parent_directories=True)  # get the Repo object of tspace
if os.path.basename(repo.working_dir) != "candycan":  # I'm in the parent repo!
    repo = repo.submodule("candycan").module()
pprint(repo.working_dir)

In [None]:
#| export
def get_argparser() -> argparse.ArgumentParser:
	"""Summary
	Get argument parser for command line arguments

	Returns:
		argparse.ArgumentParser: _description_
	"""
	parser = argparse.ArgumentParser(description='XCP Processing')

	parser.add_argument(
		'--protocol',
		type=str,
		choices=['ccp', 'xcp'],
		default='ccp',
		help='Protocol to use: ccp/xcp',
	)

	parser.add_argument(
		'--download',
		default=False,
		help='Download or upload: default is download(host->target)',
		action='store_true',
	)

	parser.add_argument(
		'--diff_flashing',
		default=True,
		help='use differential flashing',
		action='store_false',
	)

	parser.add_argument(
		'--a2l', 
		type=str,
        default=repo.working_dir+'/res/VBU_AI.json',
		help='a2l json file path')
		
	parser.add_argument(
		"--node-path",
		type=str,
		default=r"/PROJECT/MODULE[]",
		help="node path to search for calibration parameters",
	)

	parser.add_argument(
		"--leaves",
		type=list_of_strings,
		default=r"TQD_trqTrqSetNormal_MAP_v, " 
				r"VBU_L045A_CWP_05_09T_AImode_CM_single, " 
				r"Lookup2D_FLOAT32_IEEE, " 
				r"Lookup2D_X_FLOAT32_IEEE, " 
				r"Scalar_FLOAT32_IEEE, " 
				r"TQD_vVehSpd, "
				r"TQD_vSgndSpd_MAP_y, "
				r"TQD_pctAccPedPosFlt, "
				r"TQD_pctAccPdl_MAP_x",
			help="leaf nodes to search for",
	)

	parser.add_argument(
		'--channel', 
		type=int,
		default=3,
		help='CAN channel for flashing')
		
	parser.add_argument(
		'--download_id', 
		type=int,
		default=630,
		help='CAN message ID for downloading')
		
	parser.add_argument(
		'--upload_id', 
		type=int,
		default=631,
		help='CAN message ID for downloading')
		
	parser.add_argument(
		'--input', 
		type=str,
        default=repo.working_dir+'/res/download.json',
		help='Input file path')
	
	parser.add_argument(
		'--output', 
		type=str, 
        default=repo.working_dir+'/res/output.json',
		help='Output file path')
	return parser

In [None]:
show_doc(get_argparser)

In [None]:
parser = get_argparser()
args = parser.parse_args(
    args=[
        '--protocol', 'xcp',
        '--download',
        '--a2l', repo.working_dir+'/res/VBU_AI.json',
        '--node-path', r'/PROJECT/MODULE[]',
        '--leaves', r'TQD_trqTrqSetNormal_MAP_v, VBU_L045A_CWP_05_09T_AImode_CM_single, Lookup2D_FLOAT32_IEEE, Lookup2D_X_FLOAT32_IEEE, Scalar_FLOAT32_IEEE, TQD_vVehSpd, TQD_vSgndSpd_MAP_y, TQD_pctAccPedPosFlt, TQD_pctAccPdl_MAP_x',
        '--channel', '3',
        '--download_id', '630',
        '--upload_id', '631',
        '--input', repo.working_dir+'/res/download.json',
        '--output', repo.working_dir+'/res/output.json',
    ]
)

In [None]:
xcp_calib_from_xcpjson = Get_XCPCalib_From_XCPJSon(args.input)

xcp_data = Generate_Init_XCPData_From_A2L(
    a2l=args.a2l, keys=args.leaves, node_path=args.node_path
)

#  address from xcp data file should align with the address from xcp calib file
test_eq(xcp_data.address, xcp_calib_from_xcpjson.data[0].address)

# validate the model
try:
    XCPData.model_validate(xcp_data)
except ValidationError as exc:
    print(exc)

In [None]:
# type(args.channel), type(args.download_id), args.upload_id, args.download, args.diff_flashing

In [None]:
xcp_data.value = xcp_calib_from_xcpjson.data[0].value
pprint(xcp_data)

xcp_calib = XCPCalib(
    config=XCPConfig(
        channel=args.channel, download=str(args.download_id), upload=str(args.upload_id)
    ),
    data=[xcp_data],
)
pprint(xcp_calib)

In [None]:
npa =  xcp_calib.data[0].value_array_view
npa

In [None]:
# buffer = [i.hex() for x in npa for i in x]
# # buffer[::-1]
# len(buffer)
# buffer

In [None]:

# buffer = npa.tobytes()

# pprint(buffer), len(buffer)
# xcp_calib.data[0].value, len(xcp_calib.data[0].value)

In [None]:
addr = bytes('7000aa2a', 'utf-8')
a = 0x7000aa2a
a

In [None]:
npb = npa[::-1]
# npb
buffer = [struct.pack("<f", x) for x in np.nditer(npa)]
# buffer
len(buffer)

In [None]:
#| export
def npa_to_packed_buffer(a: np.ndarray) -> str:
    """ convert a numpy array to a packed string buffer for flashing
    TODO: implementation as numpy ufunc

    Args:
        a (np.ndarray): input numpy array for flashing

    Returns:
        str: packed string buffer for flashing
    """
    b = [struct.pack("<f", x).hex() for x in np.nditer(a)]
    return ''.join(b)

In [None]:

# buffer = [struct.pack("<f", x).hex() for x in np.nditer(npa)]
# buffer[::-1]
# len(buffer)
# buffer
# data = ''.join(buffer)
data = npa_to_packed_buffer(npa)
test_eq(data, xcp_calib.data[0].value)
# data

In [None]:
# npa.astype(np.float32).tobytes().hex()
buffer = npa.astype(np.float32).tobytes().hex()  ## == npa_to_packed_buffer(npa)
buffer, len(buffer)
test_eq(buffer, xcp_calib.data[0].value)

In [None]:
#| export
def flash_xcp(xcp_calib: XCPCalib, data: pd.DataFrame, diff_flashing: bool=False, download: bool=True):
    """Summary
    Flash XCP data to target

    Args:
        xcp_calib (XCPCalib): XCP calibration as template, contains all the meta information except for data
        xcp_data (pd.DataFrame): input XCP data to be flashed, replace the value in xcp_calib
        diff_flashing (bool): Use differential flashing
        download (bool): Download or upload
    
    """
    
    # convert dataframe to a hex string to be flashed and assigned to XCPCalib field data
    xcp_calib.data = data.astype(np.float32).tobytes().hex()

    if download:
        if diff_flashing:
            raise NotImplementedError("Differential flashing not implemented yet")
        else:
            pass
        

    

In [None]:
#| export
from scapy.all import *

In [None]:
pkt = IP()
pkt.canvas_dump()

In [None]:
IP()
a = IP(dst="10.10.10.28")
a.dst
a.ttl
ls(IP)

In [None]:
a = IP(ttl=10)
a.src
a.dst="192.168.1.1"
a
Ether()/IP()/TCP()
raw(IP())
# IP(_)
# a = Ether()/IP(dst="www.slashdot.org")/TCP()/"GET /index.html HTTP \n\n"
a = Ether()/IP(dst="www.baidu.com")/TCP()/"GET /index.html HTTP \n\n"
hexdump(a)
b=raw(a)
b
c = Ether(b)
c
c.hide_defaults()
c

In [None]:
os.getcwd()
a = rdpcap('../res/pcaps/ipfix.pcap')
a
# a[0].pdfdump(layer_shift=1)
a[1].psdump("/tmp/ipfix.eps", layer_shift=1)

In [None]:
a=IP(dst="www.baidu.com/30")
a
[p for p in a]

# Caution

set the python3 of the virtualenv with the CAP_NET_RAW capability!

```bash
sudo setcap 'CAP_NET_RAW+eip CAP_NET_ADMIN+eip' /dpt/.pyenv/versions/miniconda3-3.11-24.1.2-0/envs/can/bin/python3.11
```

In [None]:
# sniff(filter="icmp and host 10.10.10.28", count=2)

In [None]:
if blue_pill:  # not a virtual machine
    # install vcan interface with encrypted password to sudo 
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo modprobe vcan")
    # sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link add dev vcan0 type vcan")
    os.system("ip link show vcan0")
    # !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000  # vcan does not support set bitrate on command line!
    # !sshpass -p asdf sudo ip link add dev vcan0 type vcan
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set up vcan0")
    # !sshpass -v -p asdf sudo ip link set up vcan0
else:  # in a virtual machine (Github workflow)
    os.system("sudo modprobe vcan")
    # sshpass -v -p asdf sudo ip link add dev vcan0 type vcan
    os.system("sudo ip link add dev vcan0 type vcan")
    os.system("ip link show vcan0")
    # !gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set vcan0 type vcan bitrate 500000  # vcan does not support set bitrate on command line!
    # !sshpass -p asdf sudo ip link add dev vcan0 type vcan
    os.system("sudo ip link set up vcan0")
    

In [None]:
load_layer("can")
conf.contribs['CANSocket'] = {'use-python-can': False}
load_contrib("cansocket")

socket = CANSocket(channel='vcan0',
                receive_own_messages=True)

In [None]:
packet = CAN(identifier=0x123, data=b'12345678')
packet.show2()

In [None]:

socket.send(packet)
rx_packet = socket.recv()
rx_packet.show2()


# CCP via Scapy

In [None]:
load_contrib("automotive.ccp")


In [None]:
pkt = CCP(identifier=0x700)/CRO(ctr=1)/CONNECT(station_address=0x02)
pkt.show2()

In [None]:
pkt = CCP(identifier=0x711)/CRO(ctr=2)/GET_SEED(resource=2)
pkt.show2()

In [None]:
pkt = CCP(identifier=0x711)/CRO(ctr=3)/UNLOCK(key=b"123456")
pkt.show2()

In [None]:
pkt = CCP(identifier=0x711)/CRO(ctr=1)/GET_DAQ_SIZE()
sock = CANSocket(bustype='socketcan', channel='vcan0', receive_own_messages=True)

In [None]:
## another socket in the same process cannot receive the packet sent by the first socket
# socket2 = CANSocket(channel='vcan0')

In [None]:
## same socket cannot receive the packet sent by itself
# rx_packet = socket2.recv()

In [None]:
# rx_packet.show2()

In [None]:
# socket.sr1(packet, timeout=1)

In [None]:
# rx_packet = socket.recv()
wrpcap("./scapypcaptest.pcap", packet)

In [None]:

if blue_pill:
    # close and remove vcan0
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link set down vcan0")
    # delete vcan0
    os.system("gpg -d -q ~/.sshpasswd.gpg | sshpass -v sudo ip link delete vcan0")    
else:
    # close and remove vcan0
    os.system("sudo ip link set down vcan0")
    # delete vcan0
    os.system("sudo ip link delete vcan0")    

In [None]:
#| export
if __name__ == "__main__" and "__file__" in globals():  # only run if this file is called directly

    protocol = inquirer.select(
        message="What's the protocol?",
        choices=[
            Choice(value="ccp", name="CCP"),
            Choice(value="xcp", name="XCP"),
        ],
        default="ccp",
    ).execute()

    download = inquirer.confirm(
        message="Downloading(host->target)?",
        confirm_letter="y",
        reject_letter="n",
        default=True,
    ).execute()

    differential_flashing = inquirer.confirm(
        message="Differential Flashing?",
        confirm_letter="y",
        reject_letter="n",
        default=False,
    ).execute()

    a2l_file_path = inquirer.text(
        message="a2l file path",
        validate=EmptyInputValidator(),
        default='/res/vbu_ai.json'
    ).execute()

    # node_path = inquirer.text(
    #     message="node path",
    #     validate=EmptyInputValidator(),
    # 	default=r"/PROJECT/MODULE[]",
    # ).execute()

    # leaves = inquirer.text(
    #     message="leaves",
    #     validate=EmptyInputValidator(),
    # 	default=r"TQD_trqTrqSetNormal_MAP_v, VBU_L045A_CWP_05_09T_AImode_CM_single, Lookup2D_FLOAT32_IEEE, Lookup2D_X_FLOAT32_IEEE, Scalar_FLOAT32_IEEE, TQD_vVehSpd, TQD_vSgndSpd_MAP_y, TQD_pctAccPedPosFlt, TQD_pctAccPdl_MAP_x"
    # ).execute()

    can_channel = inquirer.number(
        message="CAN channel for flashing",
        min_allowed=0,
        max_allowed=32,
        validate=EmptyInputValidator(),
        default=3,
    ).execute()

    download_id = inquirer.number(
        message="CAN ID for downloading",
        min_allowed=0,
        max_allowed=9999,
        validate=EmptyInputValidator(),
        default=630,
    ).execute()

    upload_id = inquirer.number(
        message="CAN ID for uploading",
        min_allowed=0,
        max_allowed=9999,
        validate=EmptyInputValidator(),
        default=631,
    ).execute()

    input_file_path = inquirer.text(
        message="Input file path",
        validate=EmptyInputValidator(),
        default="/res/download.json",
    ).execute()

    output_file_path = inquirer.text(
        message="Output file path",
        validate=EmptyInputValidator(),
        default="/res/output.json",
    ).execute()

    args = get_argparser().parse_args()
    args.protocol = protocol
    args.download = download
    args.diff_flashing = differential_flashing
    # args.a2l = a2l_file_path
    # args.node_path = node_path
    # args.leaves = leaves
    args.channel = can_channel
    args.download_id = download_id
    args.upload_id = upload_id
    args.input = repo.working_dir+input_file_path
    args.output = repo.working_dir+output_file_path
    pprint(args)

    xcp_calib_from_xcpjson = Get_XCPCalib_From_XCPJSon(args.input)
    xcp_data = Generate_Init_XCPData_From_A2L(
        a2l=args.a2l, keys=args.leaves, node_path=args.node_path
    )
    try:
        XCPData.model_validate(xcp_data)
    except ValidationError as exc:
        print(exc)

    xcp_data.value = xcp_calib_from_xcpjson.data[0].value
    pprint(xcp_data)

    xcp_calib = XCPCalib(
        config=XCPConfig(
            channel=args.channel, download=args.download_id, upload=args.upload_id
        ),
        data=[xcp_data],
    )
    pprint(xcp_calib)

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()